You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by el...@apache.org on 2017/08/14 01:50:22 UTC

[1/4] hbase git commit: HBASE-14135 Merge backup images (Vladimir Rodionov)

Repository: hbase
Updated Branches:
  refs/heads/branch-2 b4d44467f -> 35aa7aae3
  refs/heads/master c6ac04ab3 -> 05e6e5695


http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java
deleted file mode 100644
index ba1b65e..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.backup.mapreduce;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValue.Type;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.HFileInputFormat;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
-import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
-import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-/**
- * A tool to split HFiles into new region boundaries as a MapReduce job. The tool generates HFiles
- * for later bulk importing.
- */
-@InterfaceAudience.Private
-public class HFileSplitterJob extends Configured implements Tool {
-  private static final Log LOG = LogFactory.getLog(HFileSplitterJob.class);
-  final static String NAME = "HFileSplitterJob";
-  public final static String BULK_OUTPUT_CONF_KEY = "hfile.bulk.output";
-  public final static String TABLES_KEY = "hfile.input.tables";
-  public final static String TABLE_MAP_KEY = "hfile.input.tablesmap";
-  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
-
-  public HFileSplitterJob() {
-  }
-
-  protected HFileSplitterJob(final Configuration c) {
-    super(c);
-  }
-
-  /**
-   * A mapper that just writes out cells. This one can be used together with
-   * {@link KeyValueSortReducer}
-   */
-  static class HFileCellMapper extends
-      Mapper<NullWritable, KeyValue, ImmutableBytesWritable, KeyValue> {
-
-    @Override
-    public void map(NullWritable key, KeyValue value, Context context) throws IOException,
-        InterruptedException {
-      // Convert value to KeyValue if subclass
-      if (!value.getClass().equals(KeyValue.class)) {
-        value =
-            new KeyValue(value.getRowArray(), value.getRowOffset(), value.getRowLength(),
-                value.getFamilyArray(), value.getFamilyOffset(), value.getFamilyLength(),
-                value.getQualifierArray(), value.getQualifierOffset(), value.getQualifierLength(),
-                value.getTimestamp(), Type.codeToType(value.getTypeByte()), value.getValueArray(),
-                value.getValueOffset(), value.getValueLength());
-      }
-      context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)), value);
-    }
-
-    @Override
-    public void setup(Context context) throws IOException {
-      // do nothing
-    }
-  }
-
-  /**
-   * Sets up the actual job.
-   * @param args The command line parameters.
-   * @return The newly created job.
-   * @throws IOException When setting up the job fails.
-   */
-  public Job createSubmittableJob(String[] args) throws IOException {
-    Configuration conf = getConf();
-    String inputDirs = args[0];
-    String tabName = args[1];
-    conf.setStrings(TABLES_KEY, tabName);
-    conf.set(FileInputFormat.INPUT_DIR, inputDirs);
-    Job job =
-        Job.getInstance(conf,
-          conf.get(JOB_NAME_CONF_KEY, NAME + "_" + EnvironmentEdgeManager.currentTime()));
-    job.setJarByClass(HFileSplitterJob.class);
-    job.setInputFormatClass(HFileInputFormat.class);
-    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
-    String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
-    if (hfileOutPath != null) {
-      LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);
-      TableName tableName = TableName.valueOf(tabName);
-      job.setMapperClass(HFileCellMapper.class);
-      job.setReducerClass(KeyValueSortReducer.class);
-      Path outputDir = new Path(hfileOutPath);
-      FileOutputFormat.setOutputPath(job, outputDir);
-      job.setMapOutputValueClass(KeyValue.class);
-      try (Connection conn = ConnectionFactory.createConnection(conf);
-          Table table = conn.getTable(tableName);
-          RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
-        HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
-      }
-      LOG.debug("success configuring load incremental job");
-
-      TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
-        org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.class);
-    } else {
-      throw new IOException("No bulk output directory specified");
-    }
-    return job;
-  }
-
-  /**
-   * Print usage
-   * @param errorMsg Error message. Can be null.
-   */
-  private void usage(final String errorMsg) {
-    if (errorMsg != null && errorMsg.length() > 0) {
-      System.err.println("ERROR: " + errorMsg);
-    }
-    System.err.println("Usage: " + NAME + " [options] <HFile inputdir(s)> <table>");
-    System.err.println("Read all HFile's for <table> and split them to <table> region boundaries.");
-    System.err.println("<table>  table to load.\n");
-    System.err.println("To generate HFiles for a bulk data load, pass the option:");
-    System.err.println("  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
-    System.err.println("Other options:");
-    System.err.println("   -D " + JOB_NAME_CONF_KEY
-        + "=jobName - use the specified mapreduce job name for the HFile splitter");
-    System.err.println("For performance also consider the following options:\n"
-        + "  -Dmapreduce.map.speculative=false\n" + "  -Dmapreduce.reduce.speculative=false");
-  }
-
-  /**
-   * Main entry point.
-   * @param args The command line parameters.
-   * @throws Exception When running the job fails.
-   */
-  public static void main(String[] args) throws Exception {
-    int ret = ToolRunner.run(new HFileSplitterJob(HBaseConfiguration.create()), args);
-    System.exit(ret);
-  }
-
-  @Override
-  public int run(String[] args) throws Exception {
-    if (args.length < 2) {
-      usage("Wrong number of arguments: " + args.length);
-      System.exit(-1);
-    }
-    Job job = createSubmittableJob(args);
-    int result = job.waitForCompletion(true) ? 0 : 1;
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java
new file mode 100644
index 0000000..00c5b83
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java
@@ -0,0 +1,321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.mapreduce;
+
+import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupInfo;
+import org.apache.hadoop.hbase.backup.BackupMergeJob;
+import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
+import org.apache.hadoop.hbase.backup.HBackupFileSystem;
+import org.apache.hadoop.hbase.backup.impl.BackupManifest;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.util.Tool;
+
+/**
+ * MapReduce implementation of {@link BackupMergeJob}
+ * Must be initialized with configuration of a backup destination cluster
+ *
+ */
+
+@InterfaceAudience.Private
+public class MapReduceBackupMergeJob implements BackupMergeJob {
+  public static final Log LOG = LogFactory.getLog(MapReduceBackupMergeJob.class);
+
+  protected Tool player;
+  protected Configuration conf;
+
+  public MapReduceBackupMergeJob() {
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public void run(String[] backupIds) throws IOException {
+    String bulkOutputConfKey;
+
+    // TODO : run player on remote cluster
+    player = new MapReduceHFileSplitterJob();
+    bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY;
+    // Player reads all files in arbitrary directory structure and creates
+    // a Map task for each file
+    String bids = StringUtils.join(backupIds, ",");
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Merge backup images " + bids);
+    }
+
+    List<Pair<TableName, Path>> processedTableList = new ArrayList<Pair<TableName, Path>>();
+    boolean finishedTables = false;
+    Connection conn = ConnectionFactory.createConnection(getConf());
+    BackupSystemTable table = new BackupSystemTable(conn);
+    FileSystem fs = FileSystem.get(getConf());
+
+    try {
+
+      // Get exclusive lock on backup system
+      table.startBackupExclusiveOperation();
+      // Start merge operation
+      table.startMergeOperation(backupIds);
+
+      // Select most recent backup id
+      String mergedBackupId = findMostRecentBackupId(backupIds);
+
+      TableName[] tableNames = getTableNamesInBackupImages(backupIds);
+      String backupRoot = null;
+
+      BackupInfo bInfo = table.readBackupInfo(backupIds[0]);
+      backupRoot = bInfo.getBackupRootDir();
+
+      for (int i = 0; i < tableNames.length; i++) {
+
+        LOG.info("Merge backup images for " + tableNames[i]);
+
+        // Find input directories for table
+
+        Path[] dirPaths = findInputDirectories(fs, backupRoot, tableNames[i], backupIds);
+        String dirs = StringUtils.join(dirPaths, ",");
+        Path bulkOutputPath =
+            BackupUtils.getBulkOutputDir(BackupUtils.getFileNameCompatibleString(tableNames[i]),
+              getConf(), false);
+        // Delete content if exists
+        if (fs.exists(bulkOutputPath)) {
+          if (!fs.delete(bulkOutputPath, true)) {
+            LOG.warn("Can not delete: " + bulkOutputPath);
+          }
+        }
+        Configuration conf = getConf();
+        conf.set(bulkOutputConfKey, bulkOutputPath.toString());
+        String[] playerArgs = { dirs, tableNames[i].getNameAsString() };
+
+        int result = 0;
+
+        player.setConf(getConf());
+        result = player.run(playerArgs);
+        if (!succeeded(result)) {
+          throw new IOException("Can not merge backup images for " + dirs
+              + " (check Hadoop/MR and HBase logs). Player return code =" + result);
+        }
+        // Add to processed table list
+        processedTableList.add(new Pair<TableName, Path>(tableNames[i], bulkOutputPath));
+        LOG.debug("Merge Job finished:" + result);
+      }
+      List<TableName> tableList = toTableNameList(processedTableList);
+      table.updateProcessedTablesForMerge(tableList);
+      finishedTables = true;
+
+      // Move data
+      for (Pair<TableName, Path> tn : processedTableList) {
+        moveData(fs, backupRoot, tn.getSecond(), tn.getFirst(), mergedBackupId);
+      }
+
+      // Delete old data and update manifest
+      List<String> backupsToDelete = getBackupIdsToDelete(backupIds, mergedBackupId);
+      deleteBackupImages(backupsToDelete, conn, fs, backupRoot);
+      updateBackupManifest(backupRoot, mergedBackupId, backupsToDelete);
+      // Finish merge session
+      table.finishMergeOperation();
+      // Release lock
+      table.finishBackupExclusiveOperation();
+    } catch (RuntimeException e) {
+
+      throw e;
+    } catch (Exception e) {
+      LOG.error(e);
+      if (!finishedTables) {
+        // cleanup bulk directories and finish merge
+        // merge MUST be repeated (no need for repair)
+        cleanupBulkLoadDirs(fs, toPathList(processedTableList));
+        table.finishMergeOperation();
+        table.finishBackupExclusiveOperation();
+        throw new IOException("Backup merge operation failed, you should try it again", e);
+      } else {
+        // backup repair must be run
+        throw new IOException(
+            "Backup merge operation failed, run backup repair tool to restore system's integrity",
+            e);
+      }
+    } finally {
+      table.close();
+      conn.close();
+    }
+  }
+
+  protected List<Path> toPathList(List<Pair<TableName, Path>> processedTableList) {
+    ArrayList<Path> list = new ArrayList<Path>();
+    for (Pair<TableName, Path> p : processedTableList) {
+      list.add(p.getSecond());
+    }
+    return list;
+  }
+
+  protected List<TableName> toTableNameList(List<Pair<TableName, Path>> processedTableList) {
+    ArrayList<TableName> list = new ArrayList<TableName>();
+    for (Pair<TableName, Path> p : processedTableList) {
+      list.add(p.getFirst());
+    }
+    return list;
+  }
+
+  protected void cleanupBulkLoadDirs(FileSystem fs, List<Path> pathList) throws IOException {
+    for (Path path : pathList) {
+
+      if (!fs.delete(path, true)) {
+        LOG.warn("Can't delete " + path);
+      }
+    }
+  }
+
+  protected void updateBackupManifest(String backupRoot, String mergedBackupId,
+      List<String> backupsToDelete) throws IllegalArgumentException, IOException {
+
+    BackupManifest manifest =
+        HBackupFileSystem.getManifest(conf, new Path(backupRoot), mergedBackupId);
+    manifest.getBackupImage().removeAncestors(backupsToDelete);
+    // save back
+    manifest.store(conf);
+
+  }
+
+  protected void deleteBackupImages(List<String> backupIds, Connection conn, FileSystem fs,
+      String backupRoot) throws IOException {
+
+    // Delete from backup system table
+    try (BackupSystemTable table = new BackupSystemTable(conn);) {
+      for (String backupId : backupIds) {
+        table.deleteBackupInfo(backupId);
+      }
+    }
+
+    // Delete from file system
+    for (String backupId : backupIds) {
+      Path backupDirPath = HBackupFileSystem.getBackupPath(backupRoot, backupId);
+
+      if (!fs.delete(backupDirPath, true)) {
+        LOG.warn("Could not delete " + backupDirPath);
+      }
+    }
+  }
+
+  protected List<String> getBackupIdsToDelete(String[] backupIds, String mergedBackupId) {
+    List<String> list = new ArrayList<String>();
+    for (String id : backupIds) {
+      if (id.equals(mergedBackupId)) {
+        continue;
+      }
+      list.add(id);
+    }
+    return list;
+  }
+
+  protected void moveData(FileSystem fs, String backupRoot, Path bulkOutputPath, TableName tableName,
+      String mergedBackupId) throws IllegalArgumentException, IOException {
+
+    Path dest =
+        new Path(HBackupFileSystem.getTableBackupDataDir(backupRoot, mergedBackupId, tableName));
+
+    // Delete all in dest
+    if (!fs.delete(dest, true)) {
+      throw new IOException("Could not delete " + dest);
+    }
+
+    FileStatus[] fsts = fs.listStatus(bulkOutputPath);
+    for (FileStatus fst : fsts) {
+      if (fst.isDirectory()) {
+        fs.rename(fst.getPath().getParent(), dest);
+      }
+    }
+
+  }
+
+  protected String findMostRecentBackupId(String[] backupIds) {
+    long recentTimestamp = Long.MIN_VALUE;
+    for (String backupId : backupIds) {
+      long ts = Long.parseLong(backupId.split("_")[1]);
+      if (ts > recentTimestamp) {
+        recentTimestamp = ts;
+      }
+    }
+    return BackupRestoreConstants.BACKUPID_PREFIX + recentTimestamp;
+  }
+
+  protected TableName[] getTableNamesInBackupImages(String[] backupIds) throws IOException {
+
+    Set<TableName> allSet = new HashSet<TableName>();
+
+    try (Connection conn = ConnectionFactory.createConnection(conf);
+        BackupSystemTable table = new BackupSystemTable(conn);) {
+      for (String backupId : backupIds) {
+        BackupInfo bInfo = table.readBackupInfo(backupId);
+
+        allSet.addAll(bInfo.getTableNames());
+      }
+    }
+
+    TableName[] ret = new TableName[allSet.size()];
+    return allSet.toArray(ret);
+  }
+
+  protected Path[] findInputDirectories(FileSystem fs, String backupRoot, TableName tableName,
+      String[] backupIds) throws IOException {
+
+    List<Path> dirs = new ArrayList<Path>();
+
+    for (String backupId : backupIds) {
+      Path fileBackupDirPath =
+          new Path(HBackupFileSystem.getTableBackupDataDir(backupRoot, backupId, tableName));
+      if (fs.exists(fileBackupDirPath)) {
+        dirs.add(fileBackupDirPath);
+      } else {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("File: " + fileBackupDirPath + " does not exist.");
+        }
+      }
+    }
+    Path[] ret = new Path[dirs.size()];
+    return dirs.toArray(ret);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java
new file mode 100644
index 0000000..49e8c75
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileInputFormat;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
+import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * A tool to split HFiles into new region boundaries as a MapReduce job. The tool generates HFiles
+ * for later bulk importing.
+ */
+@InterfaceAudience.Private
+public class MapReduceHFileSplitterJob extends Configured implements Tool {
+  private static final Log LOG = LogFactory.getLog(MapReduceHFileSplitterJob.class);
+  final static String NAME = "HFileSplitterJob";
+  public final static String BULK_OUTPUT_CONF_KEY = "hfile.bulk.output";
+  public final static String TABLES_KEY = "hfile.input.tables";
+  public final static String TABLE_MAP_KEY = "hfile.input.tablesmap";
+  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
+
+  public MapReduceHFileSplitterJob() {
+  }
+
+  protected MapReduceHFileSplitterJob(final Configuration c) {
+    super(c);
+  }
+
+  /**
+   * A mapper that just writes out cells. This one can be used together with
+   * {@link KeyValueSortReducer}
+   */
+  static class HFileCellMapper extends
+      Mapper<NullWritable, KeyValue, ImmutableBytesWritable, KeyValue> {
+
+    @Override
+    public void map(NullWritable key, KeyValue value, Context context) throws IOException,
+        InterruptedException {
+      // Convert value to KeyValue if subclass
+      if (!value.getClass().equals(KeyValue.class)) {
+        value =
+            new KeyValue(value.getRowArray(), value.getRowOffset(), value.getRowLength(),
+                value.getFamilyArray(), value.getFamilyOffset(), value.getFamilyLength(),
+                value.getQualifierArray(), value.getQualifierOffset(), value.getQualifierLength(),
+                value.getTimestamp(), Type.codeToType(value.getTypeByte()), value.getValueArray(),
+                value.getValueOffset(), value.getValueLength());
+      }
+      context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)), value);
+    }
+
+    @Override
+    public void setup(Context context) throws IOException {
+      // do nothing
+    }
+  }
+
+  /**
+   * Sets up the actual job.
+   * @param args The command line parameters.
+   * @return The newly created job.
+   * @throws IOException When setting up the job fails.
+   */
+  public Job createSubmittableJob(String[] args) throws IOException {
+    Configuration conf = getConf();
+    String inputDirs = args[0];
+    String tabName = args[1];
+    conf.setStrings(TABLES_KEY, tabName);
+    conf.set(FileInputFormat.INPUT_DIR, inputDirs);
+    Job job =
+        Job.getInstance(conf,
+          conf.get(JOB_NAME_CONF_KEY, NAME + "_" + EnvironmentEdgeManager.currentTime()));
+    job.setJarByClass(MapReduceHFileSplitterJob.class);
+    job.setInputFormatClass(HFileInputFormat.class);
+    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+    String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
+    if (hfileOutPath != null) {
+      LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);
+      TableName tableName = TableName.valueOf(tabName);
+      job.setMapperClass(HFileCellMapper.class);
+      job.setReducerClass(KeyValueSortReducer.class);
+      Path outputDir = new Path(hfileOutPath);
+      FileOutputFormat.setOutputPath(job, outputDir);
+      job.setMapOutputValueClass(KeyValue.class);
+      try (Connection conn = ConnectionFactory.createConnection(conf);
+          Table table = conn.getTable(tableName);
+          RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
+        HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
+      }
+      LOG.debug("success configuring load incremental job");
+
+      TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
+        org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.class);
+    } else {
+      throw new IOException("No bulk output directory specified");
+    }
+    return job;
+  }
+
+  /**
+   * Print usage
+   * @param errorMsg Error message. Can be null.
+   */
+  private void usage(final String errorMsg) {
+    if (errorMsg != null && errorMsg.length() > 0) {
+      System.err.println("ERROR: " + errorMsg);
+    }
+    System.err.println("Usage: " + NAME + " [options] <HFile inputdir(s)> <table>");
+    System.err.println("Read all HFile's for <table> and split them to <table> region boundaries.");
+    System.err.println("<table>  table to load.\n");
+    System.err.println("To generate HFiles for a bulk data load, pass the option:");
+    System.err.println("  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
+    System.err.println("Other options:");
+    System.err.println("   -D " + JOB_NAME_CONF_KEY
+        + "=jobName - use the specified mapreduce job name for the HFile splitter");
+    System.err.println("For performance also consider the following options:\n"
+        + "  -Dmapreduce.map.speculative=false\n" + "  -Dmapreduce.reduce.speculative=false");
+  }
+
+  /**
+   * Main entry point.
+   * @param args The command line parameters.
+   * @throws Exception When running the job fails.
+   */
+  public static void main(String[] args) throws Exception {
+    int ret = ToolRunner.run(new MapReduceHFileSplitterJob(HBaseConfiguration.create()), args);
+    System.exit(ret);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    if (args.length < 2) {
+      usage("Wrong number of arguments: " + args.length);
+      System.exit(-1);
+    }
+    Job job = createSubmittableJob(args);
+    int result = job.waitForCompletion(true) ? 0 : 1;
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
index 4161ca9..1209e7c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
@@ -17,31 +17,31 @@
  */
 package org.apache.hadoop.hbase.backup.mapreduce;
 
+import static org.apache.hadoop.hbase.backup.util.BackupUtils.failed;
+import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded;
+
 import java.io.IOException;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
 import org.apache.hadoop.hbase.backup.RestoreJob;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
-import org.apache.hadoop.hbase.mapreduce.WALPlayer;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.util.Tool;
 
+
 /**
  * MapReduce implementation of {@link RestoreJob}
  *
- * For full backup restore, it runs {@link HFileSplitterJob} job and creates
+ * For backup restore, it runs {@link MapReduceHFileSplitterJob} job and creates
  * HFiles which are aligned with a region boundaries of a table being
- * restored, for incremental backup restore it runs {@link WALPlayer} in
- * bulk load mode (creates HFiles from WAL edits).
+ * restored.
  *
  * The resulting HFiles then are loaded using HBase bulk load tool
  * {@link LoadIncrementalHFiles}
@@ -62,8 +62,8 @@ public class MapReduceRestoreJob implements RestoreJob {
 
     String bulkOutputConfKey;
 
-    player = new HFileSplitterJob();
-    bulkOutputConfKey = HFileSplitterJob.BULK_OUTPUT_CONF_KEY;
+    player = new MapReduceHFileSplitterJob();
+    bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY;
     // Player reads all files in arbitrary directory structure and creates
     // a Map task for each file
     String dirs = StringUtils.join(dirPaths, ",");
@@ -71,8 +71,8 @@ public class MapReduceRestoreJob implements RestoreJob {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Restore " + (fullBackupRestore ? "full" : "incremental")
           + " backup from directory " + dirs + " from hbase tables "
-          + StringUtils.join(tableNames, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND) +
-          " to tables "
+          + StringUtils.join(tableNames, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND)
+          + " to tables "
           + StringUtils.join(newTableNames, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND));
     }
 
@@ -80,13 +80,16 @@ public class MapReduceRestoreJob implements RestoreJob {
 
       LOG.info("Restore " + tableNames[i] + " into " + newTableNames[i]);
 
-      Path bulkOutputPath = getBulkOutputDir(getFileNameCompatibleString(newTableNames[i]));
+      Path bulkOutputPath =
+          BackupUtils.getBulkOutputDir(BackupUtils.getFileNameCompatibleString(newTableNames[i]),
+            getConf());
       Configuration conf = getConf();
       conf.set(bulkOutputConfKey, bulkOutputPath.toString());
       String[] playerArgs =
-        { dirs,
-          fullBackupRestore? newTableNames[i].getNameAsString():tableNames[i].getNameAsString()
-        };
+          {
+              dirs,
+              fullBackupRestore ? newTableNames[i].getNameAsString() : tableNames[i]
+                  .getNameAsString() };
 
       int result = 0;
       int loaderResult = 0;
@@ -96,7 +99,7 @@ public class MapReduceRestoreJob implements RestoreJob {
         result = player.run(playerArgs);
         if (succeeded(result)) {
           // do bulk load
-          LoadIncrementalHFiles loader = createLoader(getConf());
+          LoadIncrementalHFiles loader = BackupUtils.createLoader(getConf());
           if (LOG.isDebugEnabled()) {
             LOG.debug("Restoring HFiles from directory " + bulkOutputPath);
           }
@@ -113,58 +116,11 @@ public class MapReduceRestoreJob implements RestoreJob {
         }
         LOG.debug("Restore Job finished:" + result);
       } catch (Exception e) {
+        LOG.error(e);
         throw new IOException("Can not restore from backup directory " + dirs
             + " (check Hadoop and HBase logs) ", e);
       }
-
-    }
-  }
-
-  private String getFileNameCompatibleString(TableName table) {
-    return table.getNamespaceAsString() + "-" + table.getQualifierAsString();
-  }
-
-  private boolean failed(int result) {
-    return result != 0;
-  }
-
-  private boolean succeeded(int result) {
-    return result == 0;
-  }
-
-  public static LoadIncrementalHFiles createLoader(Configuration config) throws IOException {
-    // set configuration for restore:
-    // LoadIncrementalHFile needs more time
-    // <name>hbase.rpc.timeout</name> <value>600000</value>
-    // calculates
-    Integer milliSecInHour = 3600000;
-    Configuration conf = new Configuration(config);
-    conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, milliSecInHour);
-
-    // By default, it is 32 and loader will fail if # of files in any region exceed this
-    // limit. Bad for snapshot restore.
-    conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE);
-    conf.set(LoadIncrementalHFiles.IGNORE_UNMATCHED_CF_CONF_KEY, "yes");
-    LoadIncrementalHFiles loader = null;
-    try {
-      loader = new LoadIncrementalHFiles(conf);
-    } catch (Exception e) {
-      throw new IOException(e);
     }
-    return loader;
-  }
-
-  private Path getBulkOutputDir(String tableName) throws IOException {
-    Configuration conf = getConf();
-    FileSystem fs = FileSystem.get(conf);
-    String tmp =
-        conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
-          HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
-    Path path =
-        new Path(tmp + Path.SEPARATOR + "bulk_output-" + tableName + "-"
-            + EnvironmentEdgeManager.currentTime());
-    fs.deleteOnExit(path);
-    return path;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
index e32853d..ce77645 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
@@ -56,7 +56,9 @@ import org.apache.hadoop.hbase.backup.impl.BackupManifest;
 import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSTableDescriptors;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
@@ -68,14 +70,15 @@ import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 public final class BackupUtils {
   protected static final Log LOG = LogFactory.getLog(BackupUtils.class);
   public static final String LOGNAME_SEPARATOR = ".";
+  public static final int MILLISEC_IN_HOUR = 3600000;
 
   private BackupUtils() {
     throw new AssertionError("Instantiating utility class...");
   }
 
   /**
-   * Loop through the RS log timestamp map for the tables, for each RS, find the min timestamp
-   * value for the RS among the tables.
+   * Loop through the RS log timestamp map for the tables, for each RS, find the min timestamp value
+   * for the RS among the tables.
    * @param rsLogTimestampMap timestamp map
    * @return the min timestamp of each RS
    */
@@ -114,16 +117,17 @@ public final class BackupUtils {
   }
 
   /**
-   * copy out Table RegionInfo into incremental backup image need to consider move this
-   * logic into HBackupFileSystem
+   * copy out Table RegionInfo into incremental backup image need to consider move this logic into
+   * HBackupFileSystem
    * @param conn connection
    * @param backupInfo backup info
    * @param conf configuration
    * @throws IOException exception
    * @throws InterruptedException exception
    */
-  public static void copyTableRegionInfo(Connection conn, BackupInfo backupInfo,
-      Configuration conf) throws IOException, InterruptedException {
+  public static void
+      copyTableRegionInfo(Connection conn, BackupInfo backupInfo, Configuration conf)
+          throws IOException, InterruptedException {
     Path rootDir = FSUtils.getRootDir(conf);
     FileSystem fs = rootDir.getFileSystem(conf);
 
@@ -152,10 +156,8 @@ public final class BackupUtils {
       LOG.debug("Starting to write region info for table " + table);
       for (HRegionInfo regionInfo : regions) {
         Path regionDir =
-            HRegion.getRegionDir(new Path(backupInfo.getTableBackupDir(table)),
-              regionInfo);
-        regionDir =
-            new Path(backupInfo.getTableBackupDir(table), regionDir.getName());
+            HRegion.getRegionDir(new Path(backupInfo.getTableBackupDir(table)), regionInfo);
+        regionDir = new Path(backupInfo.getTableBackupDir(table), regionDir.getName());
         writeRegioninfoOnFilesystem(conf, targetFs, regionDir, regionInfo);
       }
       LOG.debug("Finished writing region info for table " + table);
@@ -301,7 +303,6 @@ public final class BackupUtils {
     return ret;
   }
 
-
   /**
    * Check whether the backup path exist
    * @param backupStr backup
@@ -431,8 +432,7 @@ public final class BackupUtils {
    * @param conf configuration
    * @throws IOException exception
    */
-  private static void cleanupHLogDir(BackupInfo backupInfo, Configuration conf)
-      throws IOException {
+  private static void cleanupHLogDir(BackupInfo backupInfo, Configuration conf) throws IOException {
 
     String logDir = backupInfo.getHLogTargetDir();
     if (logDir == null) {
@@ -452,7 +452,6 @@ public final class BackupUtils {
     }
   }
 
-
   private static void cleanupTargetDir(BackupInfo backupInfo, Configuration conf) {
     try {
       // clean up the data at target directory
@@ -498,8 +497,8 @@ public final class BackupUtils {
    * @param tableName table name
    * @return backupPath String for the particular table
    */
-  public static String getTableBackupDir(String backupRootDir, String backupId,
-      TableName tableName) {
+  public static String
+      getTableBackupDir(String backupRootDir, String backupId, TableName tableName) {
     return backupRootDir + Path.SEPARATOR + backupId + Path.SEPARATOR
         + tableName.getNamespaceAsString() + Path.SEPARATOR + tableName.getQualifierAsString()
         + Path.SEPARATOR;
@@ -523,7 +522,6 @@ public final class BackupUtils {
     return list;
   }
 
-
   /**
    * Calls fs.listStatus() and treats FileNotFoundException as non-fatal This accommodates
    * differences between hadoop versions, where hadoop 1 does not throw a FileNotFoundException, and
@@ -655,19 +653,16 @@ public final class BackupUtils {
    * @param backupId backup id
    * @param check check only
    * @param fromTables table list from
-   * @param toTables   table list to
+   * @param toTables table list to
    * @param isOverwrite overwrite data
    * @return request obkect
    */
   public static RestoreRequest createRestoreRequest(String backupRootDir, String backupId,
       boolean check, TableName[] fromTables, TableName[] toTables, boolean isOverwrite) {
     RestoreRequest.Builder builder = new RestoreRequest.Builder();
-    RestoreRequest request = builder.withBackupRootDir(backupRootDir)
-                                    .withBackupId(backupId)
-                                    .withCheck(check)
-                                    .withFromTables(fromTables)
-                                    .withToTables(toTables)
-                                    .withOvewrite(isOverwrite).build();
+    RestoreRequest request =
+        builder.withBackupRootDir(backupRootDir).withBackupId(backupId).withCheck(check)
+            .withFromTables(fromTables).withToTables(toTables).withOvewrite(isOverwrite).build();
     return request;
   }
 
@@ -699,4 +694,54 @@ public final class BackupUtils {
     return isValid;
   }
 
+  public static Path getBulkOutputDir(String tableName, Configuration conf, boolean deleteOnExit)
+      throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    String tmp =
+        conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
+    Path path =
+        new Path(tmp + Path.SEPARATOR + "bulk_output-" + tableName + "-"
+            + EnvironmentEdgeManager.currentTime());
+    if (deleteOnExit) {
+      fs.deleteOnExit(path);
+    }
+    return path;
+  }
+
+  public static Path getBulkOutputDir(String tableName, Configuration conf) throws IOException {
+    return getBulkOutputDir(tableName, conf, true);
+  }
+
+  public static String getFileNameCompatibleString(TableName table) {
+    return table.getNamespaceAsString() + "-" + table.getQualifierAsString();
+  }
+
+  public static boolean failed(int result) {
+    return result != 0;
+  }
+
+  public static boolean succeeded(int result) {
+    return result == 0;
+  }
+
+  public static LoadIncrementalHFiles createLoader(Configuration config) throws IOException {
+    // set configuration for restore:
+    // LoadIncrementalHFile needs more time
+    // <name>hbase.rpc.timeout</name> <value>600000</value>
+    // calculates
+    Configuration conf = new Configuration(config);
+    conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, MILLISEC_IN_HOUR);
+
+    // By default, it is 32 and loader will fail if # of files in any region exceed this
+    // limit. Bad for snapshot restore.
+    conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE);
+    conf.set(LoadIncrementalHFiles.IGNORE_UNMATCHED_CF_CONF_KEY, "yes");
+    LoadIncrementalHFiles loader = null;
+    try {
+      loader = new LoadIncrementalHFiles(conf);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+    return loader;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java
new file mode 100644
index 0000000..7011ed3
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup;
+
+import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupMergeJob;
+import org.apache.hadoop.hbase.backup.mapreduce.MapReduceHFileSplitterJob;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Lists;
+
+@Category(LargeTests.class)
+public class TestIncrementalBackupMergeWithFailures extends TestBackupBase {
+  private static final Log LOG = LogFactory.getLog(TestIncrementalBackupMergeWithFailures.class);
+
+  static enum FailurePhase {
+    PHASE1, PHASE2, PHASE3, PHASE4
+  }
+  public final static String FAILURE_PHASE_KEY = "failurePhase";
+
+  static class BackupMergeJobWithFailures extends MapReduceBackupMergeJob {
+
+    FailurePhase failurePhase;
+
+    @Override
+    public void setConf(Configuration conf) {
+      super.setConf(conf);
+      String val = conf.get(FAILURE_PHASE_KEY);
+      if (val != null) {
+        failurePhase = FailurePhase.valueOf(val);
+      } else {
+        Assert.fail("Failure phase is not set");
+      }
+    }
+
+
+    /**
+     * This is the exact copy of parent's run() with injections
+     * of different types of failures
+     */
+    @Override
+    public void run(String[] backupIds) throws IOException {
+      String bulkOutputConfKey;
+
+      // TODO : run player on remote cluster
+      player = new MapReduceHFileSplitterJob();
+      bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY;
+      // Player reads all files in arbitrary directory structure and creates
+      // a Map task for each file
+      String bids = StringUtils.join(backupIds, ",");
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Merge backup images " + bids);
+      }
+
+      List<Pair<TableName, Path>> processedTableList = new ArrayList<Pair<TableName, Path>>();
+      boolean finishedTables = false;
+      Connection conn = ConnectionFactory.createConnection(getConf());
+      BackupSystemTable table = new BackupSystemTable(conn);
+      FileSystem fs = FileSystem.get(getConf());
+
+      try {
+
+        // Start backup exclusive operation
+        table.startBackupExclusiveOperation();
+        // Start merge operation
+        table.startMergeOperation(backupIds);
+
+        // Select most recent backup id
+        String mergedBackupId = findMostRecentBackupId(backupIds);
+
+        TableName[] tableNames = getTableNamesInBackupImages(backupIds);
+        String backupRoot = null;
+
+        BackupInfo bInfo = table.readBackupInfo(backupIds[0]);
+        backupRoot = bInfo.getBackupRootDir();
+        // PHASE 1
+        checkFailure(FailurePhase.PHASE1);
+
+        for (int i = 0; i < tableNames.length; i++) {
+
+          LOG.info("Merge backup images for " + tableNames[i]);
+
+          // Find input directories for table
+
+          Path[] dirPaths = findInputDirectories(fs, backupRoot, tableNames[i], backupIds);
+          String dirs = StringUtils.join(dirPaths, ",");
+          Path bulkOutputPath =
+              BackupUtils.getBulkOutputDir(BackupUtils.getFileNameCompatibleString(tableNames[i]),
+                getConf(), false);
+          // Delete content if exists
+          if (fs.exists(bulkOutputPath)) {
+            if (!fs.delete(bulkOutputPath, true)) {
+              LOG.warn("Can not delete: " + bulkOutputPath);
+            }
+          }
+          Configuration conf = getConf();
+          conf.set(bulkOutputConfKey, bulkOutputPath.toString());
+          String[] playerArgs = { dirs, tableNames[i].getNameAsString() };
+
+          int result = 0;
+          // PHASE 2
+          checkFailure(FailurePhase.PHASE2);
+          player.setConf(getConf());
+          result = player.run(playerArgs);
+          if (succeeded(result)) {
+            // Add to processed table list
+            processedTableList.add(new Pair<TableName, Path>(tableNames[i], bulkOutputPath));
+          } else {
+            throw new IOException("Can not merge backup images for " + dirs
+                + " (check Hadoop/MR and HBase logs). Player return code =" + result);
+          }
+          LOG.debug("Merge Job finished:" + result);
+        }
+        List<TableName> tableList = toTableNameList(processedTableList);
+        // PHASE 3
+        checkFailure(FailurePhase.PHASE3);
+        table.updateProcessedTablesForMerge(tableList);
+        finishedTables = true;
+
+        // Move data
+        for (Pair<TableName, Path> tn : processedTableList) {
+          moveData(fs, backupRoot, tn.getSecond(), tn.getFirst(), mergedBackupId);
+        }
+        // PHASE 4
+        checkFailure(FailurePhase.PHASE4);
+        // Delete old data and update manifest
+        List<String> backupsToDelete = getBackupIdsToDelete(backupIds, mergedBackupId);
+        deleteBackupImages(backupsToDelete, conn, fs, backupRoot);
+        updateBackupManifest(backupRoot, mergedBackupId, backupsToDelete);
+        // Finish merge session
+        table.finishMergeOperation();
+      } catch (RuntimeException e) {
+        throw e;
+      } catch (Exception e) {
+        LOG.error(e);
+        if (!finishedTables) {
+          // cleanup bulk directories and finish merge
+          // merge MUST be repeated (no need for repair)
+          cleanupBulkLoadDirs(fs, toPathList(processedTableList));
+          table.finishMergeOperation();
+          table.finishBackupExclusiveOperation();
+          throw new IOException("Backup merge operation failed, you should try it again", e);
+        } else {
+          // backup repair must be run
+          throw new IOException(
+              "Backup merge operation failed, run backup repair tool to restore system's integrity",
+              e);
+        }
+      } finally {
+        table.close();
+        conn.close();
+      }
+
+    }
+
+    private void checkFailure(FailurePhase phase) throws IOException {
+      if ( failurePhase != null && failurePhase == phase) {
+        throw new IOException (phase.toString());
+      }
+    }
+
+  }
+
+
+  @Test
+  public void TestIncBackupMergeRestore() throws Exception {
+
+    int ADD_ROWS = 99;
+    // #1 - create full backup for all tables
+    LOG.info("create full backup image for all tables");
+
+    List<TableName> tables = Lists.newArrayList(table1, table2);
+    // Set custom Merge Job implementation
+    conf1.setClass(BackupRestoreFactory.HBASE_BACKUP_MERGE_IMPL_CLASS,
+      BackupMergeJobWithFailures.class, BackupMergeJob.class);
+
+    Connection conn = ConnectionFactory.createConnection(conf1);
+
+    HBaseAdmin admin = null;
+    admin = (HBaseAdmin) conn.getAdmin();
+    BackupAdminImpl client = new BackupAdminImpl(conn);
+
+    BackupRequest request = createBackupRequest(BackupType.FULL, tables, BACKUP_ROOT_DIR);
+    String backupIdFull = client.backupTables(request);
+
+    assertTrue(checkSucceeded(backupIdFull));
+
+    // #2 - insert some data to table1
+    HTable t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
+    LOG.debug("writing " + ADD_ROWS + " rows to " + table1);
+
+    Assert.assertEquals(TEST_UTIL.countRows(t1), NB_ROWS_IN_BATCH + ADD_ROWS);
+    t1.close();
+    LOG.debug("written " + ADD_ROWS + " rows to " + table1);
+
+    HTable t2 = insertIntoTable(conn, table2, famName, 1, ADD_ROWS);
+
+    Assert.assertEquals(TEST_UTIL.countRows(t2), NB_ROWS_IN_BATCH + ADD_ROWS);
+    t2.close();
+    LOG.debug("written " + ADD_ROWS + " rows to " + table2);
+
+    // #3 - incremental backup for multiple tables
+    tables = Lists.newArrayList(table1, table2);
+    request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR);
+    String backupIdIncMultiple = client.backupTables(request);
+
+    assertTrue(checkSucceeded(backupIdIncMultiple));
+
+    t1 = insertIntoTable(conn, table1, famName, 2, ADD_ROWS);
+    t1.close();
+
+    t2 = insertIntoTable(conn, table2, famName, 2, ADD_ROWS);
+    t2.close();
+
+    // #3 - incremental backup for multiple tables
+    request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR);
+    String backupIdIncMultiple2 = client.backupTables(request);
+    assertTrue(checkSucceeded(backupIdIncMultiple2));
+
+    // #4 Merge backup images with failures
+
+    for ( FailurePhase phase : FailurePhase.values()) {
+      Configuration conf = conn.getConfiguration();
+
+      conf.set(FAILURE_PHASE_KEY, phase.toString());
+
+      try (BackupAdmin bAdmin = new BackupAdminImpl(conn);)
+      {
+        String[] backups = new String[] { backupIdIncMultiple, backupIdIncMultiple2 };
+        bAdmin.mergeBackups(backups);
+        Assert.fail("Expected IOException");
+      } catch (IOException e) {
+        BackupSystemTable table = new BackupSystemTable(conn);
+        if(phase.ordinal() < FailurePhase.PHASE4.ordinal()) {
+          // No need to repair:
+          // Both Merge and backup exclusive operations are finished
+          assertFalse(table.isMergeInProgress());
+          try {
+            table.finishBackupExclusiveOperation();
+            Assert.fail("IOException is expected");
+          } catch(IOException ee) {
+            // Expected
+          }
+        } else {
+          // Repair is required
+          assertTrue(table.isMergeInProgress());
+          try {
+            table.startBackupExclusiveOperation();
+            Assert.fail("IOException is expected");
+          } catch(IOException ee) {
+            // Expected - clean up before proceeding
+            table.finishMergeOperation();
+            table.finishBackupExclusiveOperation();
+          }
+        }
+        table.close();
+        LOG.debug("Expected :"+ e.getMessage());
+      }
+    }
+
+    // Now merge w/o failures
+    Configuration conf = conn.getConfiguration();
+    conf.unset(FAILURE_PHASE_KEY);
+    conf.unset(BackupRestoreFactory.HBASE_BACKUP_MERGE_IMPL_CLASS);
+
+    try (BackupAdmin bAdmin = new BackupAdminImpl(conn);) {
+      String[] backups = new String[] { backupIdIncMultiple, backupIdIncMultiple2 };
+      bAdmin.mergeBackups(backups);
+    }
+
+    // #6 - restore incremental backup for multiple tables, with overwrite
+    TableName[] tablesRestoreIncMultiple = new TableName[] { table1, table2 };
+    TableName[] tablesMapIncMultiple = new TableName[] { table1_restore, table2_restore };
+    client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple2, false,
+      tablesRestoreIncMultiple, tablesMapIncMultiple, true));
+
+    Table hTable = conn.getTable(table1_restore);
+    LOG.debug("After incremental restore: " + hTable.getTableDescriptor());
+    LOG.debug("f1 has " + TEST_UTIL.countRows(hTable, famName) + " rows");
+    Assert.assertEquals(TEST_UTIL.countRows(hTable, famName), NB_ROWS_IN_BATCH + 2 * ADD_ROWS);
+
+    hTable.close();
+
+    hTable = conn.getTable(table2_restore);
+    Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH + 2 * ADD_ROWS);
+    hTable.close();
+
+    admin.close();
+    conn.close();
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestRepairAfterFailedDelete.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestRepairAfterFailedDelete.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestRepairAfterFailedDelete.java
index 9c47641..556521f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestRepairAfterFailedDelete.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestRepairAfterFailedDelete.java
@@ -74,7 +74,7 @@ public class TestRepairAfterFailedDelete extends TestBackupBase {
     admin.restoreSnapshot(snapshotName);
     admin.enableTable(BackupSystemTable.getTableName(conf1));
     // Start backup session
-    table.startBackupSession();
+    table.startBackupExclusiveOperation();
     // Start delete operation
     table.startDeleteOperation(backupIds);
 


[2/4] hbase git commit: HBASE-14135 Merge backup images (Vladimir Rodionov)

Posted by el...@apache.org.
HBASE-14135 Merge backup images (Vladimir Rodionov)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/05e6e569
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/05e6e569
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/05e6e569

Branch: refs/heads/master
Commit: 05e6e5695089640006d06c2f74126b50a73363b7
Parents: c6ac04a
Author: Josh Elser <el...@apache.org>
Authored: Sun Aug 13 20:55:58 2017 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Sun Aug 13 20:55:58 2017 -0400

----------------------------------------------------------------------
 .../apache/hadoop/hbase/backup/BackupAdmin.java |  20 +-
 .../hadoop/hbase/backup/BackupDriver.java       |   2 +
 .../apache/hadoop/hbase/backup/BackupInfo.java  |   5 +
 .../hadoop/hbase/backup/BackupMergeJob.java     |  40 +++
 .../hbase/backup/BackupRestoreFactory.java      |  20 +-
 .../hadoop/hbase/backup/HBackupFileSystem.java  |  57 ++--
 .../hbase/backup/impl/BackupAdminImpl.java      | 213 +++++++++---
 .../hbase/backup/impl/BackupCommands.java       | 163 ++++++---
 .../hadoop/hbase/backup/impl/BackupManager.java |  21 +-
 .../hbase/backup/impl/BackupManifest.java       |  24 +-
 .../hbase/backup/impl/BackupSystemTable.java    | 314 ++++++++++-------
 .../hbase/backup/impl/RestoreTablesClient.java  |  32 +-
 .../backup/mapreduce/HFileSplitterJob.java      | 181 ----------
 .../mapreduce/MapReduceBackupMergeJob.java      | 321 ++++++++++++++++++
 .../mapreduce/MapReduceHFileSplitterJob.java    | 181 ++++++++++
 .../backup/mapreduce/MapReduceRestoreJob.java   |  84 ++---
 .../hadoop/hbase/backup/util/BackupUtils.java   |  93 +++--
 .../TestIncrementalBackupMergeWithFailures.java | 336 +++++++++++++++++++
 .../backup/TestRepairAfterFailedDelete.java     |   2 +-
 19 files changed, 1574 insertions(+), 535 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java
index 6f642a4..9dc6382 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java
@@ -38,8 +38,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 public interface BackupAdmin extends Closeable {
 
   /**
-   * Backup given list of tables fully. This is a synchronous operation.
-   * It returns backup id on success or throw exception on failure.
+   * Backup given list of tables fully. This is a synchronous operation. It returns backup id on
+   * success or throw exception on failure.
    * @param userRequest BackupRequest instance
    * @return the backup Id
    */
@@ -61,16 +61,24 @@ public interface BackupAdmin extends Closeable {
    */
   BackupInfo getBackupInfo(String backupId) throws IOException;
 
-
   /**
    * Delete backup image command
-   * @param backupIds backup id list
+   * @param backupIds array of backup ids
    * @return total number of deleted sessions
    * @throws IOException exception
    */
   int deleteBackups(String[] backupIds) throws IOException;
 
   /**
+   * Merge backup images command
+   * @param backupIds array of backup ids of images to be merged
+   *        The resulting backup image will have the same backup id as the most
+   *        recent image from a list of images to be merged
+   * @throws IOException exception
+   */
+  void mergeBackups(String[] backupIds) throws IOException;
+
+  /**
    * Show backup history command
    * @param n last n backup sessions
    * @return list of backup info objects
@@ -113,7 +121,7 @@ public interface BackupAdmin extends Closeable {
   /**
    * Add tables to backup set command
    * @param name name of backup set.
-   * @param tables list of tables to be added to this set.
+   * @param tables array of tables to be added to this set.
    * @throws IOException exception
    */
   void addToBackupSet(String name, TableName[] tables) throws IOException;
@@ -121,7 +129,7 @@ public interface BackupAdmin extends Closeable {
   /**
    * Remove tables from backup set
    * @param name name of backup set.
-   * @param tables list of tables to be removed from this set.
+   * @param tables array of tables to be removed from this set.
    * @throws IOException exception
    */
   void removeFromBackupSet(String name, TableName[] tables) throws IOException;

http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java
index e2cdb2f..9dd8531 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java
@@ -111,6 +111,8 @@ public class BackupDriver extends AbstractHBaseTool {
       type = BackupCommand.SET;
     } else if (BackupCommand.REPAIR.name().equalsIgnoreCase(cmd)) {
       type = BackupCommand.REPAIR;
+    } else if (BackupCommand.MERGE.name().equalsIgnoreCase(cmd)) {
+      type = BackupCommand.MERGE;
     } else {
       System.out.println("Unsupported command for backup: " + cmd);
       printToolUsage();

http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java
index f6a1fe4..1765bf3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java
@@ -433,6 +433,11 @@ public class BackupInfo implements Comparable<BackupInfo> {
     }
   }
 
+  @Override
+  public String toString() {
+    return backupId;
+  }
+
   public byte[] toByteArray() throws IOException {
     return toProtosBackupInfo().toByteArray();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupMergeJob.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupMergeJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupMergeJob.java
new file mode 100644
index 0000000..136782f
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupMergeJob.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Backup merge operation job interface. Concrete implementation is provided by backup provider, see
+ * {@link BackupRestoreFactory}
+ */
+
+@InterfaceAudience.Private
+public interface BackupMergeJob extends Configurable {
+
+  /**
+   * Run backup merge operation
+   * @param backupIds backup image ids
+   * @throws IOException
+   */
+  void run(String[] backupIds) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreFactory.java
index 6d8967a..d72c884 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreFactory.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.backup;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupCopyJob;
+import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupMergeJob;
 import org.apache.hadoop.hbase.backup.mapreduce.MapReduceRestoreJob;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -32,6 +33,7 @@ public final class BackupRestoreFactory {
 
   public final static String HBASE_INCR_RESTORE_IMPL_CLASS = "hbase.incremental.restore.class";
   public final static String HBASE_BACKUP_COPY_IMPL_CLASS = "hbase.backup.copy.class";
+  public final static String HBASE_BACKUP_MERGE_IMPL_CLASS = "hbase.backup.merge.class";
 
   private BackupRestoreFactory() {
     throw new AssertionError("Instantiating utility class...");
@@ -40,7 +42,7 @@ public final class BackupRestoreFactory {
   /**
    * Gets backup restore job
    * @param conf configuration
-   * @return backup restore task instance
+   * @return backup restore job instance
    */
   public static RestoreJob getRestoreJob(Configuration conf) {
     Class<? extends RestoreJob> cls =
@@ -53,7 +55,7 @@ public final class BackupRestoreFactory {
   /**
    * Gets backup copy job
    * @param conf configuration
-   * @return backup copy task
+   * @return backup copy job instance
    */
   public static BackupCopyJob getBackupCopyJob(Configuration conf) {
     Class<? extends BackupCopyJob> cls =
@@ -63,4 +65,18 @@ public final class BackupRestoreFactory {
     service.setConf(conf);
     return service;
   }
+
+  /**
+   * Gets backup merge job
+   * @param conf configuration
+   * @return backup merge job instance
+   */
+  public static BackupMergeJob getBackupMergeJob(Configuration conf) {
+    Class<? extends BackupMergeJob> cls =
+        conf.getClass(HBASE_BACKUP_MERGE_IMPL_CLASS, MapReduceBackupMergeJob.class,
+          BackupMergeJob.class);
+    BackupMergeJob service = ReflectionUtils.newInstance(cls, conf);
+    service.setConf(conf);
+    return service;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java
index 46044db..1c43e88 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java
@@ -49,8 +49,8 @@ public class HBackupFileSystem {
   /**
    * Given the backup root dir, backup id and the table name, return the backup image location,
    * which is also where the backup manifest file is. return value look like:
-   * "hdfs://backup.hbase.org:9000/user/biadmin/backup/backup_1396650096738/default/t1_dn/",
-   * where "hdfs://backup.hbase.org:9000/user/biadmin/backup" is a backup root directory
+   * "hdfs://backup.hbase.org:9000/user/biadmin/backup/backup_1396650096738/default/t1_dn/", where
+   * "hdfs://backup.hbase.org:9000/user/biadmin/backup" is a backup root directory
    * @param backupRootDir backup root directory
    * @param backupId backup id
    * @param tableName table name
@@ -63,18 +63,26 @@ public class HBackupFileSystem {
         + Path.SEPARATOR;
   }
 
+  public static String getTableBackupDataDir(String backupRootDir, String backupId,
+      TableName tableName) {
+    return getTableBackupDir(backupRootDir, backupId, tableName) + Path.SEPARATOR + "data";
+  }
+
+  public static Path getBackupPath(String backupRootDir, String backupId) {
+    return new Path(backupRootDir + Path.SEPARATOR + backupId);
+  }
+
   /**
    * Given the backup root dir, backup id and the table name, return the backup image location,
    * which is also where the backup manifest file is. return value look like:
-   * "hdfs://backup.hbase.org:9000/user/biadmin/backup/backup_1396650096738/default/t1_dn/",
-   * where "hdfs://backup.hbase.org:9000/user/biadmin/backup" is a backup root directory
+   * "hdfs://backup.hbase.org:9000/user/biadmin/backup/backup_1396650096738/default/t1_dn/", where
+   * "hdfs://backup.hbase.org:9000/user/biadmin/backup" is a backup root directory
    * @param backupRootPath backup root path
    * @param tableName table name
    * @param backupId backup Id
    * @return backupPath for the particular table
    */
-  public static Path getTableBackupPath(TableName tableName,
-      Path backupRootPath, String backupId) {
+  public static Path getTableBackupPath(TableName tableName, Path backupRootPath, String backupId) {
     return new Path(getTableBackupDir(backupRootPath.toString(), backupId, tableName));
   }
 
@@ -94,33 +102,30 @@ public class HBackupFileSystem {
     return new Path(getLogBackupDir(backupRootDir, backupId));
   }
 
-  private static Path getManifestPath(TableName tableName, Configuration conf, Path backupRootPath,
-      String backupId) throws IOException {
-    Path manifestPath =
-        new Path(getTableBackupPath(tableName, backupRootPath, backupId),
-            BackupManifest.MANIFEST_FILE_NAME);
+  // TODO we do not keep WAL files anymore
+  // Move manifest file to other place
+  private static Path getManifestPath(Configuration conf, Path backupRootPath, String backupId)
+      throws IOException {
+    Path manifestPath = null;
 
     FileSystem fs = backupRootPath.getFileSystem(conf);
+    manifestPath =
+        new Path(getBackupPath(backupRootPath.toString(), backupId) + Path.SEPARATOR
+            + BackupManifest.MANIFEST_FILE_NAME);
     if (!fs.exists(manifestPath)) {
-      // check log dir for incremental backup case
-      manifestPath =
-          new Path(getLogBackupDir(backupRootPath.toString(), backupId) + Path.SEPARATOR
-              + BackupManifest.MANIFEST_FILE_NAME);
-      if (!fs.exists(manifestPath)) {
-        String errorMsg =
-            "Could not find backup manifest " + BackupManifest.MANIFEST_FILE_NAME + " for "
-                + backupId + ". File " + manifestPath + " does not exists. Did " + backupId
-                + " correspond to previously taken backup ?";
-        throw new IOException(errorMsg);
-      }
+      String errorMsg =
+          "Could not find backup manifest " + BackupManifest.MANIFEST_FILE_NAME + " for "
+              + backupId + ". File " + manifestPath + " does not exists. Did " + backupId
+              + " correspond to previously taken backup ?";
+      throw new IOException(errorMsg);
     }
     return manifestPath;
   }
 
-  public static BackupManifest getManifest(TableName tableName, Configuration conf,
-      Path backupRootPath, String backupId) throws IOException {
+  public static BackupManifest
+      getManifest(Configuration conf, Path backupRootPath, String backupId) throws IOException {
     BackupManifest manifest =
-        new BackupManifest(conf, getManifestPath(tableName, conf, backupRootPath, backupId));
+        new BackupManifest(conf, getManifestPath(conf, backupRootPath, backupId));
     return manifest;
   }
 
@@ -134,7 +139,7 @@ public class HBackupFileSystem {
       TableName[] tableArray, Configuration conf, Path backupRootPath, String backupId)
       throws IOException {
     for (TableName tableName : tableArray) {
-      BackupManifest manifest = getManifest(tableName, conf, backupRootPath, backupId);
+      BackupManifest manifest = getManifest(conf, backupRootPath, backupId);
       backupManifestMap.put(tableName, manifest);
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
index 6e35d92..99fb06c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.backup.impl;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -36,8 +37,10 @@ import org.apache.hadoop.hbase.backup.BackupAdmin;
 import org.apache.hadoop.hbase.backup.BackupClientFactory;
 import org.apache.hadoop.hbase.backup.BackupInfo;
 import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
+import org.apache.hadoop.hbase.backup.BackupMergeJob;
 import org.apache.hadoop.hbase.backup.BackupRequest;
 import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
+import org.apache.hadoop.hbase.backup.BackupRestoreFactory;
 import org.apache.hadoop.hbase.backup.BackupType;
 import org.apache.hadoop.hbase.backup.HBackupFileSystem;
 import org.apache.hadoop.hbase.backup.RestoreRequest;
@@ -46,9 +49,8 @@ import org.apache.hadoop.hbase.backup.util.BackupUtils;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-
 import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
 @InterfaceAudience.Private
 public class BackupAdminImpl implements BackupAdmin {
@@ -65,12 +67,8 @@ public class BackupAdminImpl implements BackupAdmin {
 
   @Override
   public void close() throws IOException {
-    if (conn != null) {
-      conn.close();
-    }
   }
 
-
   @Override
   public BackupInfo getBackupInfo(String backupId) throws IOException {
     BackupInfo backupInfo = null;
@@ -105,12 +103,12 @@ public class BackupAdminImpl implements BackupAdmin {
       // is running by using startBackupSession API
       // If there is an active session in progress, exception will be thrown
       try {
-        sysTable.startBackupSession();
+        sysTable.startBackupExclusiveOperation();
         deleteSessionStarted = true;
       } catch (IOException e) {
         LOG.warn("You can not run delete command while active backup session is in progress. \n"
             + "If there is no active backup session running, run backup repair utility to restore \n"
-            +"backup system integrity.");
+            + "backup system integrity.");
         return -1;
       }
 
@@ -126,7 +124,7 @@ public class BackupAdminImpl implements BackupAdmin {
       sysTable.startDeleteOperation(backupIds);
       // Step 4: Snapshot backup system table
       if (!BackupSystemTable.snapshotExists(conn)) {
-          BackupSystemTable.snapshot(conn);
+        BackupSystemTable.snapshot(conn);
       } else {
         LOG.warn("Backup system table snapshot exists");
       }
@@ -154,13 +152,13 @@ public class BackupAdminImpl implements BackupAdmin {
         // Fail delete operation
         // Step 1
         if (snapshotDone) {
-          if(BackupSystemTable.snapshotExists(conn)) {
+          if (BackupSystemTable.snapshotExists(conn)) {
             BackupSystemTable.restoreFromSnapshot(conn);
             // delete snapshot
             BackupSystemTable.deleteSnapshot(conn);
             // We still have record with unfinished delete operation
-            LOG.error("Delete operation failed, please run backup repair utility to restore "+
-                       "backup system integrity", e);
+            LOG.error("Delete operation failed, please run backup repair utility to restore "
+                + "backup system integrity", e);
             throw e;
           } else {
             LOG.warn("Delete operation succeeded, there were some errors: ", e);
@@ -169,7 +167,7 @@ public class BackupAdminImpl implements BackupAdmin {
 
       } finally {
         if (deleteSessionStarted) {
-          sysTable.finishBackupSession();
+          sysTable.finishBackupExclusiveOperation();
         }
       }
     }
@@ -206,17 +204,17 @@ public class BackupAdminImpl implements BackupAdmin {
   /**
    * Delete single backup and all related backups <br>
    * Algorithm:<br>
-   *  Backup type: FULL or INCREMENTAL <br>
-   *  Is this last backup session for table T: YES or NO <br>
-   *  For every table T from table list 'tables':<br>
-   *  if(FULL, YES) deletes only physical data (PD) <br>
-   *  if(FULL, NO), deletes PD, scans all newer backups and removes T from backupInfo,<br>
-   *  until we either reach the most recent backup for T in the system or FULL backup<br>
-   *  which includes T<br>
-   *  if(INCREMENTAL, YES) deletes only physical data (PD)
-   *  if(INCREMENTAL, NO) deletes physical data and for table T scans all backup images between last<br>
-   *  FULL backup, which is older than the backup being deleted and the next FULL backup (if exists) <br>
-   *  or last one for a particular table T and removes T from list of backup tables.
+   * Backup type: FULL or INCREMENTAL <br>
+   * Is this last backup session for table T: YES or NO <br>
+   * For every table T from table list 'tables':<br>
+   * if(FULL, YES) deletes only physical data (PD) <br>
+   * if(FULL, NO), deletes PD, scans all newer backups and removes T from backupInfo,<br>
+   * until we either reach the most recent backup for T in the system or FULL backup<br>
+   * which includes T<br>
+   * if(INCREMENTAL, YES) deletes only physical data (PD) if(INCREMENTAL, NO) deletes physical data
+   * and for table T scans all backup images between last<br>
+   * FULL backup, which is older than the backup being deleted and the next FULL backup (if exists) <br>
+   * or last one for a particular table T and removes T from list of backup tables.
    * @param backupId backup id
    * @param sysTable backup system table
    * @return total number of deleted backup images
@@ -285,8 +283,9 @@ public class BackupAdminImpl implements BackupAdmin {
     return totalDeleted;
   }
 
-  private void removeTableFromBackupImage(BackupInfo info, TableName tn,
-      BackupSystemTable sysTable) throws IOException {
+  private void
+      removeTableFromBackupImage(BackupInfo info, TableName tn, BackupSystemTable sysTable)
+          throws IOException {
     List<TableName> tables = info.getTableNames();
     LOG.debug("Remove " + tn + " from " + info.getBackupId() + " tables="
         + info.getTableListAsString());
@@ -485,7 +484,7 @@ public class BackupAdminImpl implements BackupAdmin {
 
   private String[] toStringArray(TableName[] list) {
     String[] arr = new String[list.length];
-    for(int i=0; i < list.length; i++) {
+    for (int i = 0; i < list.length; i++) {
       arr[i] = list[i].toString();
     }
     return arr;
@@ -521,7 +520,7 @@ public class BackupAdminImpl implements BackupAdmin {
     String targetRootDir = request.getTargetRootDir();
     List<TableName> tableList = request.getTableList();
 
-    String backupId =BackupRestoreConstants.BACKUPID_PREFIX + EnvironmentEdgeManager.currentTime();
+    String backupId = BackupRestoreConstants.BACKUPID_PREFIX + EnvironmentEdgeManager.currentTime();
     if (type == BackupType.INCREMENTAL) {
       Set<TableName> incrTableSet = null;
       try (BackupSystemTable table = new BackupSystemTable(conn)) {
@@ -529,19 +528,20 @@ public class BackupAdminImpl implements BackupAdmin {
       }
 
       if (incrTableSet.isEmpty()) {
-        String msg = "Incremental backup table set contains no tables. "
-            + "You need to run full backup first " +
-            (tableList != null ? "on "+StringUtils.join(tableList, ","): "");
+        String msg =
+            "Incremental backup table set contains no tables. "
+                + "You need to run full backup first "
+                + (tableList != null ? "on " + StringUtils.join(tableList, ",") : "");
 
         throw new IOException(msg);
       }
-      if(tableList != null) {
+      if (tableList != null) {
         tableList.removeAll(incrTableSet);
         if (!tableList.isEmpty()) {
           String extraTables = StringUtils.join(tableList, ",");
-          String msg = "Some tables (" + extraTables + ") haven't gone through full backup. "+
-            "Perform full backup on " + extraTables + " first, "
-            + "then retry the command";
+          String msg =
+              "Some tables (" + extraTables + ") haven't gone through full backup. "
+                  + "Perform full backup on " + extraTables + " first, " + "then retry the command";
           throw new IOException(msg);
         }
       }
@@ -584,14 +584,13 @@ public class BackupAdminImpl implements BackupAdmin {
 
     // update table list
     BackupRequest.Builder builder = new BackupRequest.Builder();
-    request = builder.withBackupType(request.getBackupType()).
-                      withTableList(tableList).
-                      withTargetRootDir(request.getTargetRootDir()).
-                      withBackupSetName(request.getBackupSetName()).
-                      withTotalTasks(request.getTotalTasks()).
-                      withBandwidthPerTasks((int)request.getBandwidth()).build();
-
-    TableBackupClient client =null;
+    request =
+        builder.withBackupType(request.getBackupType()).withTableList(tableList)
+            .withTargetRootDir(request.getTargetRootDir())
+            .withBackupSetName(request.getBackupSetName()).withTotalTasks(request.getTotalTasks())
+            .withBandwidthPerTasks((int) request.getBandwidth()).build();
+
+    TableBackupClient client = null;
     try {
       client = BackupClientFactory.create(conn, backupId, request);
     } catch (IOException e) {
@@ -613,4 +612,132 @@ public class BackupAdminImpl implements BackupAdmin {
     return tableList;
   }
 
+  @Override
+  public void mergeBackups(String[] backupIds) throws IOException {
+    try (final BackupSystemTable sysTable = new BackupSystemTable(conn);) {
+      checkIfValidForMerge(backupIds, sysTable);
+      BackupMergeJob job = BackupRestoreFactory.getBackupMergeJob(conn.getConfiguration());
+      job.run(backupIds);
+    }
+  }
+
+  /**
+   * Verifies that backup images are valid for merge.
+   *
+   * <ul>
+   * <li>All backups MUST be in the same destination
+   * <li>No FULL backups are allowed - only INCREMENTAL
+   * <li>All backups must be in COMPLETE state
+   * <li>No holes in backup list are allowed
+   * </ul>
+   * <p>
+   * @param backupIds list of backup ids
+   * @param table backup system table
+   * @throws IOException
+   */
+  private void checkIfValidForMerge(String[] backupIds, BackupSystemTable table) throws IOException {
+    String backupRoot = null;
+
+    final Set<TableName> allTables = new HashSet<TableName>();
+    final Set<String> allBackups = new HashSet<String>();
+    long minTime = Long.MAX_VALUE, maxTime = Long.MIN_VALUE;
+    for (String backupId : backupIds) {
+      BackupInfo bInfo = table.readBackupInfo(backupId);
+      if (bInfo == null) {
+        String msg = "Backup session " + backupId + " not found";
+        throw new IOException(msg);
+      }
+      if (backupRoot == null) {
+        backupRoot = bInfo.getBackupRootDir();
+      } else if (!bInfo.getBackupRootDir().equals(backupRoot)) {
+        throw new IOException("Found different backup destinations in a list of a backup sessions \n"
+            + "1. " + backupRoot + "\n" + "2. " + bInfo.getBackupRootDir());
+      }
+      if (bInfo.getType() == BackupType.FULL) {
+        throw new IOException("FULL backup image can not be merged for: \n" + bInfo);
+      }
+
+      if (bInfo.getState() != BackupState.COMPLETE) {
+        throw new IOException("Backup image " + backupId
+            + " can not be merged becuase of its state: " + bInfo.getState());
+      }
+      allBackups.add(backupId);
+      allTables.addAll(bInfo.getTableNames());
+      long time = bInfo.getStartTs();
+      if (time < minTime) {
+        minTime = time;
+      }
+      if (time > maxTime) {
+        maxTime = time;
+      }
+    }
+
+
+    final long startRangeTime  = minTime;
+    final long endRangeTime = maxTime;
+    final String backupDest = backupRoot;
+    // Check we have no 'holes' in backup id list
+    // Filter 1 : backupRoot
+    // Filter 2 : time range filter
+    // Filter 3 : table filter
+
+    BackupInfo.Filter destinationFilter = new  BackupInfo.Filter() {
+
+      @Override
+      public boolean apply(BackupInfo info) {
+        return info.getBackupRootDir().equals(backupDest);
+      }
+    };
+
+    BackupInfo.Filter timeRangeFilter = new  BackupInfo.Filter() {
+
+      @Override
+      public boolean apply(BackupInfo info) {
+        long time = info.getStartTs();
+        return time >= startRangeTime && time <= endRangeTime ;
+      }
+    };
+
+    BackupInfo.Filter tableFilter = new  BackupInfo.Filter() {
+
+      @Override
+      public boolean apply(BackupInfo info) {
+        List<TableName> tables = info.getTableNames();
+        return !Collections.disjoint(allTables, tables);
+      }
+    };
+
+    BackupInfo.Filter typeFilter = new  BackupInfo.Filter() {
+
+      @Override
+      public boolean apply(BackupInfo info) {
+        return info.getType() == BackupType.INCREMENTAL;
+      }
+    };
+
+    BackupInfo.Filter stateFilter = new  BackupInfo.Filter() {
+      @Override
+      public boolean apply(BackupInfo info) {
+        return info.getState() == BackupState.COMPLETE;
+      }
+    };
+
+    List<BackupInfo> allInfos =
+        table.getBackupHistory( -1, destinationFilter,
+          timeRangeFilter, tableFilter, typeFilter, stateFilter);
+    if (allInfos.size() != allBackups.size()) {
+      // Yes we have at least one  hole in backup image sequence
+      List<String> missingIds = new ArrayList<String>();
+      for(BackupInfo info: allInfos) {
+        if(allBackups.contains(info.getBackupId())) {
+          continue;
+        }
+        missingIds.add(info.getBackupId());
+      }
+      String errMsg =
+          "Sequence of backup ids has 'holes'. The following backup images must be added:" +
+           org.apache.hadoop.util.StringUtils.join(",", missingIds);
+      throw new IOException(errMsg);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
index aa15fba..650ba2e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
@@ -59,16 +59,15 @@ import org.apache.hadoop.hbase.backup.util.BackupUtils;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-
 import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
 /**
  * General backup commands, options and usage messages
  */
 
 @InterfaceAudience.Private
-public final class BackupCommands  {
+public final class BackupCommands {
 
   public final static String INCORRECT_USAGE = "Incorrect usage";
 
@@ -79,7 +78,8 @@ public final class BackupCommands  {
       + "  history    show history of all successful backups\n"
       + "  progress   show the progress of the latest backup request\n"
       + "  set        backup set management\n"
-      + "  repair     repair backup system table"
+      + "  repair     repair backup system table\n"
+      + "  merge      merge backup images\n"
       + "Run \'hbase backup COMMAND -h\' to see help message for each command\n";
 
   public static final String CREATE_CMD_USAGE =
@@ -109,17 +109,20 @@ public final class BackupCommands  {
 
   public static final String SET_CMD_USAGE = "Usage: hbase backup set COMMAND [name] [tables]\n"
       + "  name            Backup set name\n"
-      + "  tables          Comma separated list of tables.\n"
-      + "COMMAND is one of:\n" + "  add             add tables to a set, create a set if needed\n"
+      + "  tables          Comma separated list of tables.\n" + "COMMAND is one of:\n"
+      + "  add             add tables to a set, create a set if needed\n"
       + "  remove          remove tables from a set\n"
       + "  list            list all backup sets in the system\n"
       + "  describe        describe set\n" + "  delete          delete backup set\n";
+  public static final String MERGE_CMD_USAGE = "Usage: hbase backup merge [backup_ids]\n"
+      + "  backup_ids      Comma separated list of backup image ids.\n";
 
   public static final String USAGE_FOOTER = "";
 
   public static abstract class Command extends Configured {
     CommandLine cmdline;
     Connection conn;
+
     Command(Configuration conf) {
       if (conf == null) {
         conf = HBaseConfiguration.create();
@@ -140,7 +143,7 @@ public final class BackupCommands  {
         try (BackupSystemTable table = new BackupSystemTable(conn);) {
           List<BackupInfo> sessions = table.getBackupInfos(BackupState.RUNNING);
 
-          if(sessions.size() > 0) {
+          if (sessions.size() > 0) {
             System.err.println("Found backup session in a RUNNING state: ");
             System.err.println(sessions.get(0));
             System.err.println("This may indicate that a previous session has failed abnormally.");
@@ -154,11 +157,19 @@ public final class BackupCommands  {
         try (BackupSystemTable table = new BackupSystemTable(conn);) {
           String[] ids = table.getListOfBackupIdsFromDeleteOperation();
 
-          if(ids !=null && ids.length > 0) {
-            System.err.println("Found failed backup delete coommand. ");
+          if (ids != null && ids.length > 0) {
+            System.err.println("Found failed backup DELETE coommand. ");
             System.err.println("Backup system recovery is required.");
-            throw new IOException("Failed backup delete found, aborted command execution");
+            throw new IOException("Failed backup DELETE found, aborted command execution");
           }
+
+          ids = table.getListOfBackupIdsFromMergeOperation();
+          if (ids != null && ids.length > 0) {
+            System.err.println("Found failed backup MERGE coommand. ");
+            System.err.println("Backup system recovery is required.");
+            throw new IOException("Failed backup MERGE found, aborted command execution");
+          }
+
         }
       }
     }
@@ -178,10 +189,10 @@ public final class BackupCommands  {
     protected boolean requiresNoActiveSession() {
       return false;
     }
+
     /**
-     * Command requires consistent state of a backup system
-     * Backup system may become inconsistent because of an abnormal
-     * termination of a backup session or delete command
+     * Command requires consistent state of a backup system Backup system may become inconsistent
+     * because of an abnormal termination of a backup session or delete command
      * @return true, if yes
      */
     protected boolean requiresConsistentState() {
@@ -220,6 +231,9 @@ public final class BackupCommands  {
     case REPAIR:
       cmd = new RepairCommand(conf, cmdline);
       break;
+    case MERGE:
+      cmd = new MergeCommand(conf, cmdline);
+      break;
     case HELP:
     default:
       cmd = new HelpCommand(conf, cmdline);
@@ -257,7 +271,7 @@ public final class BackupCommands  {
         throw new IOException(INCORRECT_USAGE);
       }
       String[] args = cmdline.getArgs();
-      if (args.length !=3) {
+      if (args.length != 3) {
         printUsage();
         throw new IOException(INCORRECT_USAGE);
       }
@@ -274,7 +288,6 @@ public final class BackupCommands  {
         throw new IOException(INCORRECT_USAGE);
       }
 
-
       String tables = null;
 
       // Check if we have both: backup set and list of tables
@@ -310,14 +323,14 @@ public final class BackupCommands  {
 
       try (BackupAdminImpl admin = new BackupAdminImpl(conn);) {
 
-       BackupRequest.Builder builder = new BackupRequest.Builder();
-       BackupRequest request = builder.withBackupType(BackupType.valueOf(args[1].toUpperCase()))
-            .withTableList(tables != null ?
-                          Lists.newArrayList(BackupUtils.parseTableNames(tables)) : null)
-            .withTargetRootDir(args[2])
-            .withTotalTasks(workers)
-            .withBandwidthPerTasks(bandwidth)
-            .withBackupSetName(setName).build();
+        BackupRequest.Builder builder = new BackupRequest.Builder();
+        BackupRequest request =
+            builder
+                .withBackupType(BackupType.valueOf(args[1].toUpperCase()))
+                .withTableList(
+                  tables != null ? Lists.newArrayList(BackupUtils.parseTableNames(tables)) : null)
+                .withTargetRootDir(args[2]).withTotalTasks(workers)
+                .withBandwidthPerTasks(bandwidth).withBackupSetName(setName).build();
         String backupId = admin.backupTables(request);
         System.out.println("Backup session " + backupId + " finished. Status: SUCCESS");
       } catch (IOException e) {
@@ -544,7 +557,8 @@ public final class BackupCommands  {
         int deleted = admin.deleteBackups(backupIds);
         System.out.println("Deleted " + deleted + " backups. Total requested: " + args.length);
       } catch (IOException e) {
-        System.err.println("Delete command FAILED. Please run backup repair tool to restore backup system integrity");
+        System.err
+            .println("Delete command FAILED. Please run backup repair tool to restore backup system integrity");
         throw e;
       }
 
@@ -584,8 +598,9 @@ public final class BackupCommands  {
         if (list.size() == 0) {
           // No failed sessions found
           System.out.println("REPAIR status: no failed sessions found."
-          +" Checking failed delete backup operation ...");
+              + " Checking failed delete backup operation ...");
           repairFailedBackupDeletionIfAny(conn, sysTable);
+          repairFailedBackupMergeIfAny(conn, sysTable);
           return;
         }
         backupInfo = list.get(0);
@@ -606,32 +621,55 @@ public final class BackupCommands  {
         // If backup session is updated to FAILED state - means we
         // processed recovery already.
         sysTable.updateBackupInfo(backupInfo);
-        sysTable.finishBackupSession();
-        System.out.println("REPAIR status: finished repair failed session:\n "+ backupInfo);
+        sysTable.finishBackupExclusiveOperation();
+        System.out.println("REPAIR status: finished repair failed session:\n " + backupInfo);
 
       }
     }
 
     private void repairFailedBackupDeletionIfAny(Connection conn, BackupSystemTable sysTable)
-        throws IOException
-    {
+        throws IOException {
       String[] backupIds = sysTable.getListOfBackupIdsFromDeleteOperation();
-      if (backupIds == null ||backupIds.length == 0) {
-        System.out.println("No failed backup delete operation found");
+      if (backupIds == null || backupIds.length == 0) {
+        System.out.println("No failed backup DELETE operation found");
         // Delete backup table snapshot if exists
         BackupSystemTable.deleteSnapshot(conn);
         return;
       }
-      System.out.println("Found failed delete operation for: " + StringUtils.join(backupIds));
-      System.out.println("Running delete again ...");
+      System.out.println("Found failed DELETE operation for: " + StringUtils.join(backupIds));
+      System.out.println("Running DELETE again ...");
       // Restore table from snapshot
       BackupSystemTable.restoreFromSnapshot(conn);
       // Finish previous failed session
-      sysTable.finishBackupSession();
-      try(BackupAdmin admin = new BackupAdminImpl(conn);) {
+      sysTable.finishBackupExclusiveOperation();
+      try (BackupAdmin admin = new BackupAdminImpl(conn);) {
         admin.deleteBackups(backupIds);
       }
-      System.out.println("Delete operation finished OK: "+ StringUtils.join(backupIds));
+      System.out.println("DELETE operation finished OK: " + StringUtils.join(backupIds));
+
+    }
+
+    private void repairFailedBackupMergeIfAny(Connection conn, BackupSystemTable sysTable)
+        throws IOException {
+      String[] backupIds = sysTable.getListOfBackupIdsFromMergeOperation();
+      if (backupIds == null || backupIds.length == 0) {
+        System.out.println("No failed backup MERGE operation found");
+        // Delete backup table snapshot if exists
+        BackupSystemTable.deleteSnapshot(conn);
+        return;
+      }
+      System.out.println("Found failed MERGE operation for: " + StringUtils.join(backupIds));
+      System.out.println("Running MERGE again ...");
+      // Restore table from snapshot
+      BackupSystemTable.restoreFromSnapshot(conn);
+      // Unlock backupo system
+      sysTable.finishBackupExclusiveOperation();
+      // Finish previous failed session
+      sysTable.finishMergeOperation();
+      try (BackupAdmin admin = new BackupAdminImpl(conn);) {
+        admin.mergeBackups(backupIds);
+      }
+      System.out.println("MERGE operation finished OK: " + StringUtils.join(backupIds));
 
     }
 
@@ -641,6 +679,56 @@ public final class BackupCommands  {
     }
   }
 
+  private static class MergeCommand extends Command {
+
+    MergeCommand(Configuration conf, CommandLine cmdline) {
+      super(conf);
+      this.cmdline = cmdline;
+    }
+
+    @Override
+    protected boolean requiresNoActiveSession() {
+      return true;
+    }
+
+    @Override
+    protected boolean requiresConsistentState() {
+      return true;
+    }
+
+    @Override
+    public void execute() throws IOException {
+      super.execute();
+
+      String[] args = cmdline == null ? null : cmdline.getArgs();
+      if (args == null || (args.length != 2)) {
+        System.err.println("ERROR: wrong number of arguments: "
+            + (args == null ? null : args.length));
+        printUsage();
+        throw new IOException(INCORRECT_USAGE);
+      }
+
+      String[] backupIds = args[1].split(",");
+      if (backupIds.length < 2) {
+        String msg = "ERROR: can not merge a single backup image. "+
+            "Number of images must be greater than 1.";
+        System.err.println(msg);
+        throw new IOException(msg);
+
+      }
+      Configuration conf = getConf() != null ? getConf() : HBaseConfiguration.create();
+      try (final Connection conn = ConnectionFactory.createConnection(conf);
+          final BackupAdminImpl admin = new BackupAdminImpl(conn);) {
+        admin.mergeBackups(backupIds);
+      }
+    }
+
+    @Override
+    protected void printUsage() {
+      System.out.println(MERGE_CMD_USAGE);
+    }
+  }
+
   // TODO Cancel command
 
   private static class CancelCommand extends Command {
@@ -672,7 +760,6 @@ public final class BackupCommands  {
     @Override
     public void execute() throws IOException {
 
-
       int n = parseHistoryLength();
       final TableName tableName = getTableName();
       final String setName = getTableSetName();
@@ -883,7 +970,7 @@ public final class BackupCommands  {
 
     private TableName[] toTableNames(String[] tables) {
       TableName[] arr = new TableName[tables.length];
-      for (int i=0; i < tables.length; i++) {
+      for (int i = 0; i < tables.length; i++) {
         arr[i] = TableName.valueOf(tables[i]);
       }
       return arr;

http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
index bf80506..8fe5eaf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
@@ -115,8 +115,8 @@ public class BackupManager implements Closeable {
     }
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Added log cleaner: " + cleanerClass +"\n" +
-                "Added master procedure manager: " + masterProcedureClass);
+      LOG.debug("Added log cleaner: " + cleanerClass + "\n" + "Added master procedure manager: "
+          + masterProcedureClass);
     }
 
   }
@@ -185,9 +185,8 @@ public class BackupManager implements Closeable {
    * @return BackupInfo
    * @throws BackupException exception
    */
-  public BackupInfo createBackupInfo(String backupId, BackupType type,
-      List<TableName> tableList, String targetRootDir, int workers, long bandwidth)
-      throws BackupException {
+  public BackupInfo createBackupInfo(String backupId, BackupType type, List<TableName> tableList,
+      String targetRootDir, int workers, long bandwidth) throws BackupException {
     if (targetRootDir == null) {
       throw new BackupException("Wrong backup request parameter: target backup root directory");
     }
@@ -313,7 +312,7 @@ public class BackupManager implements Closeable {
           }
         } else {
           Path logBackupPath =
-              HBackupFileSystem.getLogBackupPath(backup.getBackupRootDir(), backup.getBackupId());
+              HBackupFileSystem.getBackupPath(backup.getBackupRootDir(), backup.getBackupId());
           LOG.debug("Current backup has an incremental backup ancestor, "
               + "touching its image manifest in " + logBackupPath.toString()
               + " to construct the dependency.");
@@ -371,7 +370,7 @@ public class BackupManager implements Closeable {
    * @throws IOException if active session already exists
    */
   public void startBackupSession() throws IOException {
-    systemTable.startBackupSession();
+    systemTable.startBackupExclusiveOperation();
   }
 
   /**
@@ -379,10 +378,9 @@ public class BackupManager implements Closeable {
    * @throws IOException if no active session
    */
   public void finishBackupSession() throws IOException {
-    systemTable.finishBackupSession();
+    systemTable.finishBackupExclusiveOperation();
   }
 
-
   /**
    * Read the last backup start code (timestamp) of last successful backup. Will return null if
    * there is no startcode stored in backup system table or the value is of length 0. These two
@@ -413,7 +411,7 @@ public class BackupManager implements Closeable {
   }
 
   public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>>
-  readBulkloadRows(List<TableName> tableList) throws IOException {
+      readBulkloadRows(List<TableName> tableList) throws IOException {
     return systemTable.readBulkloadRows(tableList);
   }
 
@@ -448,8 +446,7 @@ public class BackupManager implements Closeable {
    */
   public void writeRegionServerLogTimestamp(Set<TableName> tables,
       HashMap<String, Long> newTimestamps) throws IOException {
-    systemTable.writeRegionServerLogTimestamp(tables, newTimestamps,
-      backupInfo.getBackupRootDir());
+    systemTable.writeRegionServerLogTimestamp(tables, newTimestamps, backupInfo.getBackupRootDir());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java
index b8adac9..7e3201e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.backup.BackupInfo;
 import org.apache.hadoop.hbase.backup.BackupType;
+import org.apache.hadoop.hbase.backup.HBackupFileSystem;
 import org.apache.hadoop.hbase.backup.util.BackupUtils;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
@@ -61,9 +62,8 @@ public class BackupManifest {
   public static final String MANIFEST_FILE_NAME = ".backup.manifest";
 
   /**
-   *  Backup image, the dependency graph is made up by series of backup images
-   *  BackupImage contains all the relevant information to restore the backup and
-   *  is used during restore operation
+   * Backup image, the dependency graph is made up by series of backup images BackupImage contains
+   * all the relevant information to restore the backup and is used during restore operation
    */
 
   public static class BackupImage implements Comparable<BackupImage> {
@@ -294,6 +294,16 @@ public class BackupManifest {
       return this.ancestors;
     }
 
+    public void removeAncestors(List<String> backupIds) {
+      List<BackupImage> toRemove = new ArrayList<BackupImage>();
+      for (BackupImage im : this.ancestors) {
+        if (backupIds.contains(im.getBackupId())) {
+          toRemove.add(im);
+        }
+      }
+      this.ancestors.removeAll(toRemove);
+    }
+
     private void addAncestor(BackupImage backupImage) {
       this.getAncestors().add(backupImage);
     }
@@ -464,18 +474,16 @@ public class BackupManifest {
   }
 
   /**
-   * Persist the manifest file.
+   * TODO: fix it. Persist the manifest file.
    * @throws IOException IOException when storing the manifest file.
    */
 
   public void store(Configuration conf) throws BackupException {
     byte[] data = backupImage.toProto().toByteArray();
     // write the file, overwrite if already exist
-    String logBackupDir =
-        BackupUtils.getLogBackupDir(backupImage.getRootDir(), backupImage.getBackupId());
     Path manifestFilePath =
-        new Path(new Path((tableBackupDir != null ? tableBackupDir : logBackupDir)),
-            MANIFEST_FILE_NAME);
+        new Path(HBackupFileSystem.getBackupPath(backupImage.getRootDir(),
+          backupImage.getBackupId()), MANIFEST_FILE_NAME);
     try (FSDataOutputStream out =
         manifestFilePath.getFileSystem(conf).create(manifestFilePath, true);) {
       out.write(data);

http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
index e5a3daa..4dab046 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
@@ -82,6 +82,7 @@ import org.apache.hadoop.hbase.util.Pair;
  * value = backupId and full WAL file name</li>
  * </ul></p>
  */
+
 @InterfaceAudience.Private
 public final class BackupSystemTable implements Closeable {
   private static final Log LOG = LogFactory.getLog(BackupSystemTable.class);
@@ -118,7 +119,7 @@ public final class BackupSystemTable implements Closeable {
 
   private TableName tableName;
   /**
-   *  Stores backup sessions (contexts)
+   * Stores backup sessions (contexts)
    */
   final static byte[] SESSIONS_FAMILY = "session".getBytes();
   /**
@@ -127,11 +128,10 @@ public final class BackupSystemTable implements Closeable {
   final static byte[] META_FAMILY = "meta".getBytes();
   final static byte[] BULK_LOAD_FAMILY = "bulk".getBytes();
   /**
-   *  Connection to HBase cluster, shared among all instances
+   * Connection to HBase cluster, shared among all instances
    */
   private final Connection connection;
 
-
   private final static String BACKUP_INFO_PREFIX = "session:";
   private final static String START_CODE_ROW = "startcode:";
   private final static byte[] ACTIVE_SESSION_ROW = "activesession:".getBytes();
@@ -147,6 +147,7 @@ public final class BackupSystemTable implements Closeable {
   private final static String BULK_LOAD_PREFIX = "bulk:";
   private final static byte[] BULK_LOAD_PREFIX_BYTES = BULK_LOAD_PREFIX.getBytes();
   private final static byte[] DELETE_OP_ROW = "delete_op_row".getBytes();
+  private final static byte[] MERGE_OP_ROW = "merge_op_row".getBytes();
 
   final static byte[] TBL_COL = Bytes.toBytes("tbl");
   final static byte[] FAM_COL = Bytes.toBytes("fam");
@@ -160,7 +161,7 @@ public final class BackupSystemTable implements Closeable {
   private final static String SET_KEY_PREFIX = "backupset:";
 
   // separator between BULK_LOAD_PREFIX and ordinals
- protected final static String BLK_LD_DELIM = ":";
+  protected final static String BLK_LD_DELIM = ":";
   private final static byte[] EMPTY_VALUE = new byte[] {};
 
   // Safe delimiter in a string
@@ -187,19 +188,19 @@ public final class BackupSystemTable implements Closeable {
   }
 
   private void verifyNamespaceExists(Admin admin) throws IOException {
-      String namespaceName  = tableName.getNamespaceAsString();
-      NamespaceDescriptor ns = NamespaceDescriptor.create(namespaceName).build();
-      NamespaceDescriptor[] list = admin.listNamespaceDescriptors();
-      boolean exists = false;
-      for( NamespaceDescriptor nsd: list) {
-        if (nsd.getName().equals(ns.getName())) {
-          exists = true;
-          break;
-        }
-      }
-      if (!exists) {
-        admin.createNamespace(ns);
+    String namespaceName = tableName.getNamespaceAsString();
+    NamespaceDescriptor ns = NamespaceDescriptor.create(namespaceName).build();
+    NamespaceDescriptor[] list = admin.listNamespaceDescriptors();
+    boolean exists = false;
+    for (NamespaceDescriptor nsd : list) {
+      if (nsd.getName().equals(ns.getName())) {
+        exists = true;
+        break;
       }
+    }
+    if (!exists) {
+      admin.createNamespace(ns);
+    }
   }
 
   private void waitForSystemTable(Admin admin) throws IOException {
@@ -211,15 +212,13 @@ public final class BackupSystemTable implements Closeable {
       } catch (InterruptedException e) {
       }
       if (EnvironmentEdgeManager.currentTime() - startTime > TIMEOUT) {
-        throw new IOException("Failed to create backup system table after "+ TIMEOUT+"ms");
+        throw new IOException("Failed to create backup system table after " + TIMEOUT + "ms");
       }
     }
     LOG.debug("Backup table exists and available");
 
   }
 
-
-
   @Override
   public void close() {
     // do nothing
@@ -257,7 +256,7 @@ public final class BackupSystemTable implements Closeable {
         byte[] row = CellUtil.cloneRow(res.listCells().get(0));
         for (Cell cell : res.listCells()) {
           if (CellComparator.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0,
-              BackupSystemTable.PATH_COL.length) == 0) {
+            BackupSystemTable.PATH_COL.length) == 0) {
             map.put(row, Bytes.toString(CellUtil.cloneValue(cell)));
           }
         }
@@ -286,13 +285,13 @@ public final class BackupSystemTable implements Closeable {
         String path = null;
         for (Cell cell : res.listCells()) {
           if (CellComparator.compareQualifiers(cell, BackupSystemTable.TBL_COL, 0,
-              BackupSystemTable.TBL_COL.length) == 0) {
+            BackupSystemTable.TBL_COL.length) == 0) {
             tbl = TableName.valueOf(CellUtil.cloneValue(cell));
           } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0,
-              BackupSystemTable.FAM_COL.length) == 0) {
+            BackupSystemTable.FAM_COL.length) == 0) {
             fam = CellUtil.cloneValue(cell);
           } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0,
-              BackupSystemTable.PATH_COL.length) == 0) {
+            BackupSystemTable.PATH_COL.length) == 0) {
             path = Bytes.toString(CellUtil.cloneValue(cell));
           }
         }
@@ -313,9 +312,10 @@ public final class BackupSystemTable implements Closeable {
         }
         files.add(new Path(path));
         if (LOG.isDebugEnabled()) {
-          LOG.debug("found bulk loaded file : " + tbl + " " +  Bytes.toString(fam) + " " + path);
+          LOG.debug("found bulk loaded file : " + tbl + " " + Bytes.toString(fam) + " " + path);
         }
-      };
+      }
+      ;
       return mapForSrc;
     }
   }
@@ -359,16 +359,16 @@ public final class BackupSystemTable implements Closeable {
   public void writePathsPostBulkLoad(TableName tabName, byte[] region,
       Map<byte[], List<Path>> finalPaths) throws IOException {
     if (LOG.isDebugEnabled()) {
-      LOG.debug("write bulk load descriptor to backup " + tabName + " with " +
-          finalPaths.size() + " entries");
+      LOG.debug("write bulk load descriptor to backup " + tabName + " with " + finalPaths.size()
+          + " entries");
     }
     try (Table table = connection.getTable(tableName)) {
-      List<Put> puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region,
-          finalPaths);
+      List<Put> puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region, finalPaths);
       table.put(puts);
       LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName);
     }
   }
+
   /*
    * For preCommitStoreFile() hook
    * @param tabName table name
@@ -376,15 +376,15 @@ public final class BackupSystemTable implements Closeable {
    * @param family column family
    * @param pairs list of paths for hfiles
    */
-  public void writeFilesForBulkLoadPreCommit(TableName tabName, byte[] region,
-      final byte[] family, final List<Pair<Path, Path>> pairs) throws IOException {
+  public void writeFilesForBulkLoadPreCommit(TableName tabName, byte[] region, final byte[] family,
+      final List<Pair<Path, Path>> pairs) throws IOException {
     if (LOG.isDebugEnabled()) {
-      LOG.debug("write bulk load descriptor to backup " + tabName + " with " +
-          pairs.size() + " entries");
+      LOG.debug("write bulk load descriptor to backup " + tabName + " with " + pairs.size()
+          + " entries");
     }
     try (Table table = connection.getTable(tableName)) {
-      List<Put> puts = BackupSystemTable.createPutForPreparedBulkload(tabName, region,
-          family, pairs);
+      List<Put> puts =
+          BackupSystemTable.createPutForPreparedBulkload(tabName, region, family, pairs);
       table.put(puts);
       LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName);
     }
@@ -411,11 +411,11 @@ public final class BackupSystemTable implements Closeable {
   /*
    * Reads the rows from backup table recording bulk loaded hfiles
    * @param tableList list of table names
-   * @return The keys of the Map are table, region and column family.
-   *  Value of the map reflects whether the hfile was recorded by preCommitStoreFile hook (true)
+   * @return The keys of the Map are table, region and column family. Value of the map reflects
+   * whether the hfile was recorded by preCommitStoreFile hook (true)
    */
   public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>>
-  readBulkloadRows(List<TableName> tableList) throws IOException {
+      readBulkloadRows(List<TableName> tableList) throws IOException {
     Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = new HashMap<>();
     List<byte[]> rows = new ArrayList<>();
     for (TableName tTable : tableList) {
@@ -437,13 +437,13 @@ public final class BackupSystemTable implements Closeable {
             String rowStr = Bytes.toString(row);
             region = BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr);
             if (CellComparator.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0,
-                BackupSystemTable.FAM_COL.length) == 0) {
+              BackupSystemTable.FAM_COL.length) == 0) {
               fam = Bytes.toString(CellUtil.cloneValue(cell));
             } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0,
-                BackupSystemTable.PATH_COL.length) == 0) {
+              BackupSystemTable.PATH_COL.length) == 0) {
               path = Bytes.toString(CellUtil.cloneValue(cell));
             } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.STATE_COL, 0,
-                BackupSystemTable.STATE_COL.length) == 0) {
+              BackupSystemTable.STATE_COL.length) == 0) {
               byte[] state = CellUtil.cloneValue(cell);
               if (Bytes.equals(BackupSystemTable.BL_PREPARE, state)) {
                 raw = true;
@@ -484,12 +484,13 @@ public final class BackupSystemTable implements Closeable {
         Map<byte[], List<Path>> map = maps[idx];
         TableName tn = sTableList.get(idx);
         if (map == null) continue;
-        for (Map.Entry<byte[], List<Path>> entry: map.entrySet()) {
+        for (Map.Entry<byte[], List<Path>> entry : map.entrySet()) {
           byte[] fam = entry.getKey();
           List<Path> paths = entry.getValue();
           for (Path p : paths) {
-            Put put = BackupSystemTable.createPutForBulkLoadedFile(tn, fam, p.toString(),
-                backupId, ts, cnt++);
+            Put put =
+                BackupSystemTable.createPutForBulkLoadedFile(tn, fam, p.toString(), backupId, ts,
+                  cnt++);
             puts.add(put);
           }
         }
@@ -564,18 +565,23 @@ public final class BackupSystemTable implements Closeable {
     }
   }
 
-  public void startBackupSession() throws IOException {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Start new backup session");
+  /**
+   * Exclusive operations are:
+   * create, delete, merge
+   * @throws IOException
+   */
+  public void startBackupExclusiveOperation() throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Start new backup exclusive operation");
     }
     try (Table table = connection.getTable(tableName)) {
       Put put = createPutForStartBackupSession();
-      //First try to put if row does not exist
+      // First try to put if row does not exist
       if (!table.checkAndPut(ACTIVE_SESSION_ROW, SESSIONS_FAMILY, ACTIVE_SESSION_COL, null, put)) {
         // Row exists, try to put if value == ACTIVE_SESSION_NO
         if (!table.checkAndPut(ACTIVE_SESSION_ROW, SESSIONS_FAMILY, ACTIVE_SESSION_COL,
           ACTIVE_SESSION_NO, put)) {
-          throw new IOException("There is an active backup session");
+          throw new IOException("There is an active backup exclusive operation");
         }
       }
     }
@@ -587,17 +593,15 @@ public final class BackupSystemTable implements Closeable {
     return put;
   }
 
-  public void finishBackupSession() throws IOException
-  {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Stop backup session");
+  public void finishBackupExclusiveOperation() throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Finish backup exclusive operation");
     }
     try (Table table = connection.getTable(tableName)) {
       Put put = createPutForStopBackupSession();
-      if(!table.checkAndPut(ACTIVE_SESSION_ROW, SESSIONS_FAMILY, ACTIVE_SESSION_COL,
-        ACTIVE_SESSION_YES, put))
-      {
-        throw new IOException("There is no active backup session");
+      if (!table.checkAndPut(ACTIVE_SESSION_ROW, SESSIONS_FAMILY, ACTIVE_SESSION_COL,
+        ACTIVE_SESSION_YES, put)) {
+        throw new IOException("There is no active backup exclusive operation");
       }
     }
   }
@@ -630,8 +634,7 @@ public final class BackupSystemTable implements Closeable {
         res.advance();
         Cell cell = res.current();
         byte[] row = CellUtil.cloneRow(cell);
-        String server =
-            getServerNameForReadRegionServerLastLogRollResult(row);
+        String server = getServerNameForReadRegionServerLastLogRollResult(row);
         byte[] data = CellUtil.cloneValue(cell);
         rsTimestampMap.put(server, Bytes.toLong(data));
       }
@@ -652,8 +655,7 @@ public final class BackupSystemTable implements Closeable {
       LOG.trace("write region server last roll log result to backup system table");
     }
     try (Table table = connection.getTable(tableName)) {
-      Put put =
-          createPutForRegionServerLastLogRollResult(server, ts, backupRoot);
+      Put put = createPutForRegionServerLastLogRollResult(server, ts, backupRoot);
       table.put(put);
     }
   }
@@ -685,14 +687,15 @@ public final class BackupSystemTable implements Closeable {
 
   /**
    * Get first n backup history records
-   * @param n number of records
+   * @param n number of records, if n== -1 - max number
+   *        is ignored
    * @return list of records
    * @throws IOException
    */
   public List<BackupInfo> getHistory(int n) throws IOException {
 
     List<BackupInfo> history = getBackupHistory();
-    if (history.size() <= n) return history;
+    if (n == -1 || history.size() <= n) return history;
     List<BackupInfo> list = new ArrayList<BackupInfo>();
     for (int i = 0; i < n; i++) {
       list.add(history.get(i));
@@ -703,7 +706,8 @@ public final class BackupSystemTable implements Closeable {
 
   /**
    * Get backup history records filtered by list of filters.
-   * @param n max number of records
+   * @param n max number of records, if n == -1 , then max number
+   *        is ignored
    * @param filters list of filters
    * @return backup records
    * @throws IOException
@@ -714,7 +718,7 @@ public final class BackupSystemTable implements Closeable {
     List<BackupInfo> history = getBackupHistory();
     List<BackupInfo> result = new ArrayList<BackupInfo>();
     for (BackupInfo bi : history) {
-      if (result.size() == n) break;
+      if (n >= 0 && result.size() == n) break;
       boolean passed = true;
       for (int i = 0; i < filters.length; i++) {
         if (!filters[i].apply(bi)) {
@@ -852,9 +856,7 @@ public final class BackupSystemTable implements Closeable {
     List<Put> puts = new ArrayList<Put>();
     for (TableName table : tables) {
       byte[] smapData = toTableServerTimestampProto(table, newTimestamps).toByteArray();
-      Put put =
-          createPutForWriteRegionServerLogTimestamp(table, smapData,
-            backupRoot);
+      Put put = createPutForWriteRegionServerLogTimestamp(table, smapData, backupRoot);
       puts.add(put);
     }
     try (Table table = connection.getTable(tableName)) {
@@ -1018,8 +1020,7 @@ public final class BackupSystemTable implements Closeable {
       }
     }
     try (Table table = connection.getTable(tableName)) {
-      List<Put> puts =
-          createPutsForAddWALFiles(files, backupId, backupRoot);
+      List<Put> puts = createPutsForAddWALFiles(files, backupId, backupRoot);
       table.put(puts);
     }
   }
@@ -1087,6 +1088,7 @@ public final class BackupSystemTable implements Closeable {
    * @param file name of a file to check
    * @return true, if deletable, false otherwise.
    * @throws IOException exception
+   * TODO: multiple backup destination support
    */
   public boolean isWALFileDeletable(String file) throws IOException {
     if (LOG.isTraceEnabled()) {
@@ -1271,12 +1273,12 @@ public final class BackupSystemTable implements Closeable {
       if (disjoint.length > 0 && disjoint.length != tables.length) {
         Put put = createPutForBackupSet(name, disjoint);
         table.put(put);
-      } else if(disjoint.length == tables.length) {
+      } else if (disjoint.length == tables.length) {
         LOG.warn("Backup set '" + name + "' does not contain tables ["
             + StringUtils.join(toRemove, " ") + "]");
       } else { // disjoint.length == 0 and tables.length >0
-        // Delete  backup set
-        LOG.info("Backup set '"+name+"' is empty. Deleting.");
+        // Delete backup set
+        LOG.info("Backup set '" + name + "' is empty. Deleting.");
         deleteBackupSet(name);
       }
     } finally {
@@ -1356,7 +1358,7 @@ public final class BackupSystemTable implements Closeable {
   }
 
   public static String getSnapshotName(Configuration conf) {
-    return "snapshot_"+getTableNameAsString(conf).replace(":", "_");
+    return "snapshot_" + getTableNameAsString(conf).replace(":", "_");
   }
 
   /**
@@ -1589,17 +1591,16 @@ public final class BackupSystemTable implements Closeable {
       for (Path path : entry.getValue()) {
         String file = path.toString();
         int lastSlash = file.lastIndexOf("/");
-        String filename = file.substring(lastSlash+1);
-        Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM,
-            Bytes.toString(region), BLK_LD_DELIM, filename));
+        String filename = file.substring(lastSlash + 1);
+        Put put =
+            new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM,
+              Bytes.toString(region), BLK_LD_DELIM, filename));
         put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName());
         put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, entry.getKey());
-        put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL,
-            file.getBytes());
+        put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, file.getBytes());
         put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_COMMIT);
         puts.add(put);
-        LOG.debug("writing done bulk path " + file + " for " + table + " " +
-            Bytes.toString(region));
+        LOG.debug("writing done bulk path " + file + " for " + table + " " + Bytes.toString(region));
       }
     }
     return puts;
@@ -1607,19 +1608,16 @@ public final class BackupSystemTable implements Closeable {
 
   public static void snapshot(Connection conn) throws IOException {
 
-    try (Admin admin = conn.getAdmin();){
+    try (Admin admin = conn.getAdmin();) {
       Configuration conf = conn.getConfiguration();
-      admin.snapshot(BackupSystemTable.getSnapshotName(conf),
-        BackupSystemTable.getTableName(conf));
+      admin.snapshot(BackupSystemTable.getSnapshotName(conf), BackupSystemTable.getTableName(conf));
     }
   }
 
-  public static void restoreFromSnapshot(Connection conn)
-      throws IOException {
+  public static void restoreFromSnapshot(Connection conn) throws IOException {
 
     Configuration conf = conn.getConfiguration();
-    LOG.debug("Restoring " + BackupSystemTable.getTableNameAsString(conf) +
-        " from snapshot");
+    LOG.debug("Restoring " + BackupSystemTable.getTableNameAsString(conf) + " from snapshot");
     try (Admin admin = conn.getAdmin();) {
       String snapshotName = BackupSystemTable.getSnapshotName(conf);
       if (snapshotExists(admin, snapshotName)) {
@@ -1631,8 +1629,8 @@ public final class BackupSystemTable implements Closeable {
         // Snapshot does not exists, i.e completeBackup failed after
         // deleting backup system table snapshot
         // In this case we log WARN and proceed
-        LOG.warn("Could not restore backup system table. Snapshot " + snapshotName+
-          " does not exists.");
+        LOG.warn("Could not restore backup system table. Snapshot " + snapshotName
+            + " does not exists.");
       }
     }
   }
@@ -1640,7 +1638,7 @@ public final class BackupSystemTable implements Closeable {
   protected static boolean snapshotExists(Admin admin, String snapshotName) throws IOException {
 
     List<SnapshotDescription> list = admin.listSnapshots();
-    for (SnapshotDescription desc: list) {
+    for (SnapshotDescription desc : list) {
       if (desc.getName().equals(snapshotName)) {
         return true;
       }
@@ -1648,26 +1646,25 @@ public final class BackupSystemTable implements Closeable {
     return false;
   }
 
-  public static boolean snapshotExists (Connection conn) throws IOException {
+  public static boolean snapshotExists(Connection conn) throws IOException {
     return snapshotExists(conn.getAdmin(), getSnapshotName(conn.getConfiguration()));
   }
 
-  public static void deleteSnapshot(Connection conn)
-      throws IOException {
+  public static void deleteSnapshot(Connection conn) throws IOException {
 
     Configuration conf = conn.getConfiguration();
-    LOG.debug("Deleting " + BackupSystemTable.getSnapshotName(conf) +
-        " from the system");
+    LOG.debug("Deleting " + BackupSystemTable.getSnapshotName(conf) + " from the system");
     try (Admin admin = conn.getAdmin();) {
       String snapshotName = BackupSystemTable.getSnapshotName(conf);
       if (snapshotExists(admin, snapshotName)) {
         admin.deleteSnapshot(snapshotName);
         LOG.debug("Done deleting backup system table snapshot");
       } else {
-        LOG.error("Snapshot "+snapshotName+" does not exists");
+        LOG.error("Snapshot " + snapshotName + " does not exists");
       }
     }
   }
+
   /*
    * Creates Put's for bulk load resulting from running LoadIncrementalHFiles
    */
@@ -1678,17 +1675,16 @@ public final class BackupSystemTable implements Closeable {
       Path path = pair.getSecond();
       String file = path.toString();
       int lastSlash = file.lastIndexOf("/");
-      String filename = file.substring(lastSlash+1);
-      Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM,
-          Bytes.toString(region), BLK_LD_DELIM, filename));
+      String filename = file.substring(lastSlash + 1);
+      Put put =
+          new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM, Bytes.toString(region),
+            BLK_LD_DELIM, filename));
       put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName());
       put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, family);
-      put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL,
-          file.getBytes());
+      put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, file.getBytes());
       put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_PREPARE);
       puts.add(put);
-      LOG.debug("writing raw bulk path " + file + " for " + table + " " +
-          Bytes.toString(region));
+      LOG.debug("writing raw bulk path " + file + " for " + table + " " + Bytes.toString(region));
     }
     return puts;
   }
@@ -1725,7 +1721,6 @@ public final class BackupSystemTable implements Closeable {
     return get;
   }
 
-
   public void startDeleteOperation(String[] backupIdList) throws IOException {
     if (LOG.isTraceEnabled()) {
       LOG.trace("Start delete operation for backups: " + StringUtils.join(backupIdList));
@@ -1765,6 +1760,96 @@ public final class BackupSystemTable implements Closeable {
     }
   }
 
+  private Put createPutForMergeOperation(String[] backupIdList) {
+
+    byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ","));
+    Put put = new Put(MERGE_OP_ROW);
+    put.addColumn(META_FAMILY, FAM_COL, value);
+    return put;
+  }
+
+  public boolean isMergeInProgress() throws IOException {
+    Get get = new Get(MERGE_OP_ROW);
+    try (Table table = connection.getTable(tableName)) {
+      Result res = table.get(get);
+      if (res.isEmpty()) {
+        return false;
+      }
+      return true;
+    }
+  }
+
+  private Put createPutForUpdateTablesForMerge(List<TableName> tables) {
+
+    byte[] value = Bytes.toBytes(StringUtils.join(tables, ","));
+    Put put = new Put(MERGE_OP_ROW);
+    put.addColumn(META_FAMILY, PATH_COL, value);
+    return put;
+  }
+
+  private Delete createDeleteForBackupMergeOperation() {
+
+    Delete delete = new Delete(MERGE_OP_ROW);
+    delete.addFamily(META_FAMILY);
+    return delete;
+  }
+
+  private Get createGetForMergeOperation() {
+
+    Get get = new Get(MERGE_OP_ROW);
+    get.addFamily(META_FAMILY);
+    return get;
+  }
+
+  public void startMergeOperation(String[] backupIdList) throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Start merge operation for backups: " + StringUtils.join(backupIdList));
+    }
+    Put put = createPutForMergeOperation(backupIdList);
+    try (Table table = connection.getTable(tableName)) {
+      table.put(put);
+    }
+  }
+
+  public void updateProcessedTablesForMerge(List<TableName> tables) throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Update tables for merge : " + StringUtils.join(tables, ","));
+    }
+    Put put = createPutForUpdateTablesForMerge(tables);
+    try (Table table = connection.getTable(tableName)) {
+      table.put(put);
+    }
+  }
+
+  public void finishMergeOperation() throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Finsih merge operation for backup ids ");
+    }
+    Delete delete = createDeleteForBackupMergeOperation();
+    try (Table table = connection.getTable(tableName)) {
+      table.delete(delete);
+    }
+  }
+
+  public String[] getListOfBackupIdsFromMergeOperation() throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Get backup ids for merge operation");
+    }
+    Get get = createGetForMergeOperation();
+    try (Table table = connection.getTable(tableName)) {
+      Result res = table.get(get);
+      if (res.isEmpty()) {
+        return null;
+      }
+      Cell cell = res.listCells().get(0);
+      byte[] val = CellUtil.cloneValue(cell);
+      if (val.length == 0) {
+        return null;
+      }
+      return new String(val).split(",");
+    }
+  }
+
   static Scan createScanForOrigBulkLoadedFiles(TableName table) throws IOException {
     Scan scan = new Scan();
     byte[] startRow = rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM);
@@ -1776,10 +1861,12 @@ public final class BackupSystemTable implements Closeable {
     scan.setMaxVersions(1);
     return scan;
   }
+
   static String getTableNameFromOrigBulkLoadRow(String rowStr) {
     String[] parts = rowStr.split(BLK_LD_DELIM);
     return parts[1];
   }
+
   static String getRegionNameFromOrigBulkLoadRow(String rowStr) {
     // format is bulk : namespace : table : region : file
     String[] parts = rowStr.split(BLK_LD_DELIM);
@@ -1791,6 +1878,7 @@ public final class BackupSystemTable implements Closeable {
     LOG.debug("bulk row string " + rowStr + " region " + parts[idx]);
     return parts[idx];
   }
+
   /*
    * Used to query bulk loaded hfiles which have been copied by incremental backup
    * @param backupId the backup Id. It can be null when querying for all tables
@@ -1798,13 +1886,14 @@ public final class BackupSystemTable implements Closeable {
    */
   static Scan createScanForBulkLoadedFiles(String backupId) throws IOException {
     Scan scan = new Scan();
-    byte[] startRow = backupId == null ? BULK_LOAD_PREFIX_BYTES :
-      rowkey(BULK_LOAD_PREFIX, backupId+BLK_LD_DELIM);
+    byte[] startRow =
+        backupId == null ? BULK_LOAD_PREFIX_BYTES : rowkey(BULK_LOAD_PREFIX, backupId
+            + BLK_LD_DELIM);
     byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
     stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
     scan.setStartRow(startRow);
     scan.setStopRow(stopRow);
-    //scan.setTimeRange(lower, Long.MAX_VALUE);
+    // scan.setTimeRange(lower, Long.MAX_VALUE);
     scan.addFamily(BackupSystemTable.META_FAMILY);
     scan.setMaxVersions(1);
     return scan;
@@ -1812,12 +1901,13 @@ public final class BackupSystemTable implements Closeable {
 
   static Put createPutForBulkLoadedFile(TableName tn, byte[] fam, String p, String backupId,
       long ts, int idx) {
-    Put put = new Put(rowkey(BULK_LOAD_PREFIX, backupId+BLK_LD_DELIM+ts+BLK_LD_DELIM+idx));
+    Put put = new Put(rowkey(BULK_LOAD_PREFIX, backupId + BLK_LD_DELIM + ts + BLK_LD_DELIM + idx));
     put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, tn.getName());
     put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, fam);
     put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, p.getBytes());
     return put;
   }
+
   /**
    * Creates put list for list of WAL files
    * @param files list of WAL file paths
@@ -1825,8 +1915,9 @@ public final class BackupSystemTable implements Closeable {
    * @return put list
    * @throws IOException exception
    */
-  private List<Put> createPutsForAddWALFiles(List<String> files, String backupId,
-      String backupRoot) throws IOException {
+  private List<Put>
+      createPutsForAddWALFiles(List<String> files, String backupId, String backupRoot)
+          throws IOException {
 
     List<Put> puts = new ArrayList<Put>();
     for (String file : files) {
@@ -1957,5 +2048,4 @@ public final class BackupSystemTable implements Closeable {
     return sb.toString().getBytes();
   }
 
-
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
index 381e9b1..ea7a7b8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
@@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.backup.BackupType;
 import org.apache.hadoop.hbase.backup.HBackupFileSystem;
 import org.apache.hadoop.hbase.backup.RestoreRequest;
 import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
-import org.apache.hadoop.hbase.backup.mapreduce.MapReduceRestoreJob;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
 import org.apache.hadoop.hbase.backup.util.RestoreTool;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Admin;
@@ -58,7 +58,6 @@ public class RestoreTablesClient {
   private Configuration conf;
   private Connection conn;
   private String backupId;
-  private String fullBackupId;
   private TableName[] sTableArray;
   private TableName[] tTableArray;
   private String targetRootDir;
@@ -107,8 +106,7 @@ public class RestoreTablesClient {
 
     if (existTableList.size() > 0) {
       if (!isOverwrite) {
-        LOG.error("Existing table ("
-            + existTableList
+        LOG.error("Existing table (" + existTableList
             + ") found in the restore target, please add "
             + "\"-overwrite\" option in the command if you mean"
             + " to restore to these existing tables");
@@ -148,9 +146,8 @@ public class RestoreTablesClient {
     Path tableBackupPath = HBackupFileSystem.getTableBackupPath(sTable, backupRoot, backupId);
     String lastIncrBackupId = images.length == 1 ? null : images[images.length - 1].getBackupId();
     // We need hFS only for full restore (see the code)
-    BackupManifest manifest = HBackupFileSystem.getManifest(sTable, conf, backupRoot, backupId);
+    BackupManifest manifest = HBackupFileSystem.getManifest(conf, backupRoot, backupId);
     if (manifest.getType() == BackupType.FULL) {
-      fullBackupId = manifest.getBackupImage().getBackupId();
       LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from full" + " backup image "
           + tableBackupPath.toString());
       restoreTool.fullRestoreTable(conn, tableBackupPath, sTable, tTable, truncateIfExists,
@@ -169,8 +166,8 @@ public class RestoreTablesClient {
     // full backup path comes first
     for (int i = 1; i < images.length; i++) {
       BackupImage im = images[i];
-      String fileBackupDir = HBackupFileSystem.getTableBackupDir(im.getRootDir(),
-                  im.getBackupId(), sTable)+ Path.SEPARATOR+"data";
+      String fileBackupDir =
+          HBackupFileSystem.getTableBackupDataDir(im.getRootDir(), im.getBackupId(), sTable);
       dirList.add(new Path(fileBackupDir));
     }
 
@@ -196,8 +193,10 @@ public class RestoreTablesClient {
     TreeSet<BackupImage> restoreImageSet = new TreeSet<BackupImage>();
     boolean truncateIfExists = isOverwrite;
     Set<String> backupIdSet = new HashSet<>();
+
     for (int i = 0; i < sTableArray.length; i++) {
       TableName table = sTableArray[i];
+
       BackupManifest manifest = backupManifestMap.get(table);
       // Get the image list of this backup for restore in time order from old
       // to new.
@@ -213,11 +212,8 @@ public class RestoreTablesClient {
       if (restoreImageSet != null && !restoreImageSet.isEmpty()) {
         LOG.info("Restore includes the following image(s):");
         for (BackupImage image : restoreImageSet) {
-          LOG.info("Backup: "
-              + image.getBackupId()
-              + " "
-              + HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(),
-                  table));
+          LOG.info("Backup: " + image.getBackupId() + " "
+              + HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(), table));
           if (image.getType() == BackupType.INCREMENTAL) {
             backupIdSet.add(image.getBackupId());
             LOG.debug("adding " + image.getBackupId() + " for bulk load");
@@ -232,13 +228,13 @@ public class RestoreTablesClient {
         Map<byte[], List<Path>>[] mapForSrc = table.readBulkLoadedFiles(id, sTableList);
         Map<LoadQueueItem, ByteBuffer> loaderResult;
         conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true);
-        LoadIncrementalHFiles loader = MapReduceRestoreJob.createLoader(conf);
+        LoadIncrementalHFiles loader = BackupUtils.createLoader(conf);
         for (int i = 0; i < sTableList.size(); i++) {
           if (mapForSrc[i] != null && !mapForSrc[i].isEmpty()) {
             loaderResult = loader.run(null, mapForSrc[i], tTableArray[i]);
             LOG.debug("bulk loading " + sTableList.get(i) + " to " + tTableArray[i]);
             if (loaderResult.isEmpty()) {
-              String msg = "Couldn't bulk load for " + sTableList.get(i) + " to " +tTableArray[i];
+              String msg = "Couldn't bulk load for " + sTableList.get(i) + " to " + tTableArray[i];
               LOG.error(msg);
               throw new IOException(msg);
             }
@@ -253,7 +249,7 @@ public class RestoreTablesClient {
     if (backupId == null) {
       return 0;
     }
-    return Long.parseLong(backupId.substring(backupId.lastIndexOf("_")+1));
+    return Long.parseLong(backupId.substring(backupId.lastIndexOf("_") + 1));
   }
 
   static boolean withinRange(long a, long lower, long upper) {
@@ -268,15 +264,15 @@ public class RestoreTablesClient {
     // case VALIDATION:
     // check the target tables
     checkTargetTables(tTableArray, isOverwrite);
+
     // case RESTORE_IMAGES:
     HashMap<TableName, BackupManifest> backupManifestMap = new HashMap<>();
     // check and load backup image manifest for the tables
     Path rootPath = new Path(targetRootDir);
     HBackupFileSystem.checkImageManifestExist(backupManifestMap, sTableArray, conf, rootPath,
       backupId);
+
     restore(backupManifestMap, sTableArray, tTableArray, isOverwrite);
   }
 
-
-
 }


[4/4] hbase git commit: HBASE-14135 Merge backup images (Vladimir Rodionov)

Posted by el...@apache.org.
HBASE-14135 Merge backup images (Vladimir Rodionov)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/35aa7aae
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/35aa7aae
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/35aa7aae

Branch: refs/heads/branch-2
Commit: 35aa7aae3a0d269d809416f6ff24599517f5b44b
Parents: b4d4446
Author: Josh Elser <el...@apache.org>
Authored: Sun Aug 13 20:55:58 2017 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Sun Aug 13 21:16:39 2017 -0400

----------------------------------------------------------------------
 .../apache/hadoop/hbase/backup/BackupAdmin.java |  20 +-
 .../hadoop/hbase/backup/BackupDriver.java       |   2 +
 .../apache/hadoop/hbase/backup/BackupInfo.java  |   5 +
 .../hadoop/hbase/backup/BackupMergeJob.java     |  40 +++
 .../hbase/backup/BackupRestoreFactory.java      |  20 +-
 .../hadoop/hbase/backup/HBackupFileSystem.java  |  57 ++--
 .../hbase/backup/impl/BackupAdminImpl.java      | 213 +++++++++---
 .../hbase/backup/impl/BackupCommands.java       | 163 ++++++---
 .../hadoop/hbase/backup/impl/BackupManager.java |  21 +-
 .../hbase/backup/impl/BackupManifest.java       |  24 +-
 .../hbase/backup/impl/BackupSystemTable.java    | 314 ++++++++++-------
 .../hbase/backup/impl/RestoreTablesClient.java  |  32 +-
 .../backup/mapreduce/HFileSplitterJob.java      | 181 ----------
 .../mapreduce/MapReduceBackupMergeJob.java      | 321 ++++++++++++++++++
 .../mapreduce/MapReduceHFileSplitterJob.java    | 181 ++++++++++
 .../backup/mapreduce/MapReduceRestoreJob.java   |  84 ++---
 .../hadoop/hbase/backup/util/BackupUtils.java   |  93 +++--
 .../TestIncrementalBackupMergeWithFailures.java | 336 +++++++++++++++++++
 .../backup/TestRepairAfterFailedDelete.java     |   2 +-
 19 files changed, 1574 insertions(+), 535 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/35aa7aae/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java
index 6f642a4..9dc6382 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java
@@ -38,8 +38,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 public interface BackupAdmin extends Closeable {
 
   /**
-   * Backup given list of tables fully. This is a synchronous operation.
-   * It returns backup id on success or throw exception on failure.
+   * Backup given list of tables fully. This is a synchronous operation. It returns backup id on
+   * success or throw exception on failure.
    * @param userRequest BackupRequest instance
    * @return the backup Id
    */
@@ -61,16 +61,24 @@ public interface BackupAdmin extends Closeable {
    */
   BackupInfo getBackupInfo(String backupId) throws IOException;
 
-
   /**
    * Delete backup image command
-   * @param backupIds backup id list
+   * @param backupIds array of backup ids
    * @return total number of deleted sessions
    * @throws IOException exception
    */
   int deleteBackups(String[] backupIds) throws IOException;
 
   /**
+   * Merge backup images command
+   * @param backupIds array of backup ids of images to be merged
+   *        The resulting backup image will have the same backup id as the most
+   *        recent image from a list of images to be merged
+   * @throws IOException exception
+   */
+  void mergeBackups(String[] backupIds) throws IOException;
+
+  /**
    * Show backup history command
    * @param n last n backup sessions
    * @return list of backup info objects
@@ -113,7 +121,7 @@ public interface BackupAdmin extends Closeable {
   /**
    * Add tables to backup set command
    * @param name name of backup set.
-   * @param tables list of tables to be added to this set.
+   * @param tables array of tables to be added to this set.
    * @throws IOException exception
    */
   void addToBackupSet(String name, TableName[] tables) throws IOException;
@@ -121,7 +129,7 @@ public interface BackupAdmin extends Closeable {
   /**
    * Remove tables from backup set
    * @param name name of backup set.
-   * @param tables list of tables to be removed from this set.
+   * @param tables array of tables to be removed from this set.
    * @throws IOException exception
    */
   void removeFromBackupSet(String name, TableName[] tables) throws IOException;

http://git-wip-us.apache.org/repos/asf/hbase/blob/35aa7aae/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java
index e2cdb2f..9dd8531 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java
@@ -111,6 +111,8 @@ public class BackupDriver extends AbstractHBaseTool {
       type = BackupCommand.SET;
     } else if (BackupCommand.REPAIR.name().equalsIgnoreCase(cmd)) {
       type = BackupCommand.REPAIR;
+    } else if (BackupCommand.MERGE.name().equalsIgnoreCase(cmd)) {
+      type = BackupCommand.MERGE;
     } else {
       System.out.println("Unsupported command for backup: " + cmd);
       printToolUsage();

http://git-wip-us.apache.org/repos/asf/hbase/blob/35aa7aae/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java
index f6a1fe4..1765bf3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java
@@ -433,6 +433,11 @@ public class BackupInfo implements Comparable<BackupInfo> {
     }
   }
 
+  @Override
+  public String toString() {
+    return backupId;
+  }
+
   public byte[] toByteArray() throws IOException {
     return toProtosBackupInfo().toByteArray();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/35aa7aae/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupMergeJob.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupMergeJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupMergeJob.java
new file mode 100644
index 0000000..136782f
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupMergeJob.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Backup merge operation job interface. Concrete implementation is provided by backup provider, see
+ * {@link BackupRestoreFactory}
+ */
+
+@InterfaceAudience.Private
+public interface BackupMergeJob extends Configurable {
+
+  /**
+   * Run backup merge operation
+   * @param backupIds backup image ids
+   * @throws IOException
+   */
+  void run(String[] backupIds) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/35aa7aae/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreFactory.java
index 6d8967a..d72c884 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreFactory.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.backup;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupCopyJob;
+import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupMergeJob;
 import org.apache.hadoop.hbase.backup.mapreduce.MapReduceRestoreJob;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -32,6 +33,7 @@ public final class BackupRestoreFactory {
 
   public final static String HBASE_INCR_RESTORE_IMPL_CLASS = "hbase.incremental.restore.class";
   public final static String HBASE_BACKUP_COPY_IMPL_CLASS = "hbase.backup.copy.class";
+  public final static String HBASE_BACKUP_MERGE_IMPL_CLASS = "hbase.backup.merge.class";
 
   private BackupRestoreFactory() {
     throw new AssertionError("Instantiating utility class...");
@@ -40,7 +42,7 @@ public final class BackupRestoreFactory {
   /**
    * Gets backup restore job
    * @param conf configuration
-   * @return backup restore task instance
+   * @return backup restore job instance
    */
   public static RestoreJob getRestoreJob(Configuration conf) {
     Class<? extends RestoreJob> cls =
@@ -53,7 +55,7 @@ public final class BackupRestoreFactory {
   /**
    * Gets backup copy job
    * @param conf configuration
-   * @return backup copy task
+   * @return backup copy job instance
    */
   public static BackupCopyJob getBackupCopyJob(Configuration conf) {
     Class<? extends BackupCopyJob> cls =
@@ -63,4 +65,18 @@ public final class BackupRestoreFactory {
     service.setConf(conf);
     return service;
   }
+
+  /**
+   * Gets backup merge job
+   * @param conf configuration
+   * @return backup merge job instance
+   */
+  public static BackupMergeJob getBackupMergeJob(Configuration conf) {
+    Class<? extends BackupMergeJob> cls =
+        conf.getClass(HBASE_BACKUP_MERGE_IMPL_CLASS, MapReduceBackupMergeJob.class,
+          BackupMergeJob.class);
+    BackupMergeJob service = ReflectionUtils.newInstance(cls, conf);
+    service.setConf(conf);
+    return service;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/35aa7aae/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java
index 46044db..1c43e88 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java
@@ -49,8 +49,8 @@ public class HBackupFileSystem {
   /**
    * Given the backup root dir, backup id and the table name, return the backup image location,
    * which is also where the backup manifest file is. return value look like:
-   * "hdfs://backup.hbase.org:9000/user/biadmin/backup/backup_1396650096738/default/t1_dn/",
-   * where "hdfs://backup.hbase.org:9000/user/biadmin/backup" is a backup root directory
+   * "hdfs://backup.hbase.org:9000/user/biadmin/backup/backup_1396650096738/default/t1_dn/", where
+   * "hdfs://backup.hbase.org:9000/user/biadmin/backup" is a backup root directory
    * @param backupRootDir backup root directory
    * @param backupId backup id
    * @param tableName table name
@@ -63,18 +63,26 @@ public class HBackupFileSystem {
         + Path.SEPARATOR;
   }
 
+  public static String getTableBackupDataDir(String backupRootDir, String backupId,
+      TableName tableName) {
+    return getTableBackupDir(backupRootDir, backupId, tableName) + Path.SEPARATOR + "data";
+  }
+
+  public static Path getBackupPath(String backupRootDir, String backupId) {
+    return new Path(backupRootDir + Path.SEPARATOR + backupId);
+  }
+
   /**
    * Given the backup root dir, backup id and the table name, return the backup image location,
    * which is also where the backup manifest file is. return value look like:
-   * "hdfs://backup.hbase.org:9000/user/biadmin/backup/backup_1396650096738/default/t1_dn/",
-   * where "hdfs://backup.hbase.org:9000/user/biadmin/backup" is a backup root directory
+   * "hdfs://backup.hbase.org:9000/user/biadmin/backup/backup_1396650096738/default/t1_dn/", where
+   * "hdfs://backup.hbase.org:9000/user/biadmin/backup" is a backup root directory
    * @param backupRootPath backup root path
    * @param tableName table name
    * @param backupId backup Id
    * @return backupPath for the particular table
    */
-  public static Path getTableBackupPath(TableName tableName,
-      Path backupRootPath, String backupId) {
+  public static Path getTableBackupPath(TableName tableName, Path backupRootPath, String backupId) {
     return new Path(getTableBackupDir(backupRootPath.toString(), backupId, tableName));
   }
 
@@ -94,33 +102,30 @@ public class HBackupFileSystem {
     return new Path(getLogBackupDir(backupRootDir, backupId));
   }
 
-  private static Path getManifestPath(TableName tableName, Configuration conf, Path backupRootPath,
-      String backupId) throws IOException {
-    Path manifestPath =
-        new Path(getTableBackupPath(tableName, backupRootPath, backupId),
-            BackupManifest.MANIFEST_FILE_NAME);
+  // TODO we do not keep WAL files anymore
+  // Move manifest file to other place
+  private static Path getManifestPath(Configuration conf, Path backupRootPath, String backupId)
+      throws IOException {
+    Path manifestPath = null;
 
     FileSystem fs = backupRootPath.getFileSystem(conf);
+    manifestPath =
+        new Path(getBackupPath(backupRootPath.toString(), backupId) + Path.SEPARATOR
+            + BackupManifest.MANIFEST_FILE_NAME);
     if (!fs.exists(manifestPath)) {
-      // check log dir for incremental backup case
-      manifestPath =
-          new Path(getLogBackupDir(backupRootPath.toString(), backupId) + Path.SEPARATOR
-              + BackupManifest.MANIFEST_FILE_NAME);
-      if (!fs.exists(manifestPath)) {
-        String errorMsg =
-            "Could not find backup manifest " + BackupManifest.MANIFEST_FILE_NAME + " for "
-                + backupId + ". File " + manifestPath + " does not exists. Did " + backupId
-                + " correspond to previously taken backup ?";
-        throw new IOException(errorMsg);
-      }
+      String errorMsg =
+          "Could not find backup manifest " + BackupManifest.MANIFEST_FILE_NAME + " for "
+              + backupId + ". File " + manifestPath + " does not exists. Did " + backupId
+              + " correspond to previously taken backup ?";
+      throw new IOException(errorMsg);
     }
     return manifestPath;
   }
 
-  public static BackupManifest getManifest(TableName tableName, Configuration conf,
-      Path backupRootPath, String backupId) throws IOException {
+  public static BackupManifest
+      getManifest(Configuration conf, Path backupRootPath, String backupId) throws IOException {
     BackupManifest manifest =
-        new BackupManifest(conf, getManifestPath(tableName, conf, backupRootPath, backupId));
+        new BackupManifest(conf, getManifestPath(conf, backupRootPath, backupId));
     return manifest;
   }
 
@@ -134,7 +139,7 @@ public class HBackupFileSystem {
       TableName[] tableArray, Configuration conf, Path backupRootPath, String backupId)
       throws IOException {
     for (TableName tableName : tableArray) {
-      BackupManifest manifest = getManifest(tableName, conf, backupRootPath, backupId);
+      BackupManifest manifest = getManifest(conf, backupRootPath, backupId);
       backupManifestMap.put(tableName, manifest);
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/35aa7aae/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
index 6e35d92..99fb06c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.backup.impl;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -36,8 +37,10 @@ import org.apache.hadoop.hbase.backup.BackupAdmin;
 import org.apache.hadoop.hbase.backup.BackupClientFactory;
 import org.apache.hadoop.hbase.backup.BackupInfo;
 import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
+import org.apache.hadoop.hbase.backup.BackupMergeJob;
 import org.apache.hadoop.hbase.backup.BackupRequest;
 import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
+import org.apache.hadoop.hbase.backup.BackupRestoreFactory;
 import org.apache.hadoop.hbase.backup.BackupType;
 import org.apache.hadoop.hbase.backup.HBackupFileSystem;
 import org.apache.hadoop.hbase.backup.RestoreRequest;
@@ -46,9 +49,8 @@ import org.apache.hadoop.hbase.backup.util.BackupUtils;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-
 import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
 @InterfaceAudience.Private
 public class BackupAdminImpl implements BackupAdmin {
@@ -65,12 +67,8 @@ public class BackupAdminImpl implements BackupAdmin {
 
   @Override
   public void close() throws IOException {
-    if (conn != null) {
-      conn.close();
-    }
   }
 
-
   @Override
   public BackupInfo getBackupInfo(String backupId) throws IOException {
     BackupInfo backupInfo = null;
@@ -105,12 +103,12 @@ public class BackupAdminImpl implements BackupAdmin {
       // is running by using startBackupSession API
       // If there is an active session in progress, exception will be thrown
       try {
-        sysTable.startBackupSession();
+        sysTable.startBackupExclusiveOperation();
         deleteSessionStarted = true;
       } catch (IOException e) {
         LOG.warn("You can not run delete command while active backup session is in progress. \n"
             + "If there is no active backup session running, run backup repair utility to restore \n"
-            +"backup system integrity.");
+            + "backup system integrity.");
         return -1;
       }
 
@@ -126,7 +124,7 @@ public class BackupAdminImpl implements BackupAdmin {
       sysTable.startDeleteOperation(backupIds);
       // Step 4: Snapshot backup system table
       if (!BackupSystemTable.snapshotExists(conn)) {
-          BackupSystemTable.snapshot(conn);
+        BackupSystemTable.snapshot(conn);
       } else {
         LOG.warn("Backup system table snapshot exists");
       }
@@ -154,13 +152,13 @@ public class BackupAdminImpl implements BackupAdmin {
         // Fail delete operation
         // Step 1
         if (snapshotDone) {
-          if(BackupSystemTable.snapshotExists(conn)) {
+          if (BackupSystemTable.snapshotExists(conn)) {
             BackupSystemTable.restoreFromSnapshot(conn);
             // delete snapshot
             BackupSystemTable.deleteSnapshot(conn);
             // We still have record with unfinished delete operation
-            LOG.error("Delete operation failed, please run backup repair utility to restore "+
-                       "backup system integrity", e);
+            LOG.error("Delete operation failed, please run backup repair utility to restore "
+                + "backup system integrity", e);
             throw e;
           } else {
             LOG.warn("Delete operation succeeded, there were some errors: ", e);
@@ -169,7 +167,7 @@ public class BackupAdminImpl implements BackupAdmin {
 
       } finally {
         if (deleteSessionStarted) {
-          sysTable.finishBackupSession();
+          sysTable.finishBackupExclusiveOperation();
         }
       }
     }
@@ -206,17 +204,17 @@ public class BackupAdminImpl implements BackupAdmin {
   /**
    * Delete single backup and all related backups <br>
    * Algorithm:<br>
-   *  Backup type: FULL or INCREMENTAL <br>
-   *  Is this last backup session for table T: YES or NO <br>
-   *  For every table T from table list 'tables':<br>
-   *  if(FULL, YES) deletes only physical data (PD) <br>
-   *  if(FULL, NO), deletes PD, scans all newer backups and removes T from backupInfo,<br>
-   *  until we either reach the most recent backup for T in the system or FULL backup<br>
-   *  which includes T<br>
-   *  if(INCREMENTAL, YES) deletes only physical data (PD)
-   *  if(INCREMENTAL, NO) deletes physical data and for table T scans all backup images between last<br>
-   *  FULL backup, which is older than the backup being deleted and the next FULL backup (if exists) <br>
-   *  or last one for a particular table T and removes T from list of backup tables.
+   * Backup type: FULL or INCREMENTAL <br>
+   * Is this last backup session for table T: YES or NO <br>
+   * For every table T from table list 'tables':<br>
+   * if(FULL, YES) deletes only physical data (PD) <br>
+   * if(FULL, NO), deletes PD, scans all newer backups and removes T from backupInfo,<br>
+   * until we either reach the most recent backup for T in the system or FULL backup<br>
+   * which includes T<br>
+   * if(INCREMENTAL, YES) deletes only physical data (PD) if(INCREMENTAL, NO) deletes physical data
+   * and for table T scans all backup images between last<br>
+   * FULL backup, which is older than the backup being deleted and the next FULL backup (if exists) <br>
+   * or last one for a particular table T and removes T from list of backup tables.
    * @param backupId backup id
    * @param sysTable backup system table
    * @return total number of deleted backup images
@@ -285,8 +283,9 @@ public class BackupAdminImpl implements BackupAdmin {
     return totalDeleted;
   }
 
-  private void removeTableFromBackupImage(BackupInfo info, TableName tn,
-      BackupSystemTable sysTable) throws IOException {
+  private void
+      removeTableFromBackupImage(BackupInfo info, TableName tn, BackupSystemTable sysTable)
+          throws IOException {
     List<TableName> tables = info.getTableNames();
     LOG.debug("Remove " + tn + " from " + info.getBackupId() + " tables="
         + info.getTableListAsString());
@@ -485,7 +484,7 @@ public class BackupAdminImpl implements BackupAdmin {
 
   private String[] toStringArray(TableName[] list) {
     String[] arr = new String[list.length];
-    for(int i=0; i < list.length; i++) {
+    for (int i = 0; i < list.length; i++) {
       arr[i] = list[i].toString();
     }
     return arr;
@@ -521,7 +520,7 @@ public class BackupAdminImpl implements BackupAdmin {
     String targetRootDir = request.getTargetRootDir();
     List<TableName> tableList = request.getTableList();
 
-    String backupId =BackupRestoreConstants.BACKUPID_PREFIX + EnvironmentEdgeManager.currentTime();
+    String backupId = BackupRestoreConstants.BACKUPID_PREFIX + EnvironmentEdgeManager.currentTime();
     if (type == BackupType.INCREMENTAL) {
       Set<TableName> incrTableSet = null;
       try (BackupSystemTable table = new BackupSystemTable(conn)) {
@@ -529,19 +528,20 @@ public class BackupAdminImpl implements BackupAdmin {
       }
 
       if (incrTableSet.isEmpty()) {
-        String msg = "Incremental backup table set contains no tables. "
-            + "You need to run full backup first " +
-            (tableList != null ? "on "+StringUtils.join(tableList, ","): "");
+        String msg =
+            "Incremental backup table set contains no tables. "
+                + "You need to run full backup first "
+                + (tableList != null ? "on " + StringUtils.join(tableList, ",") : "");
 
         throw new IOException(msg);
       }
-      if(tableList != null) {
+      if (tableList != null) {
         tableList.removeAll(incrTableSet);
         if (!tableList.isEmpty()) {
           String extraTables = StringUtils.join(tableList, ",");
-          String msg = "Some tables (" + extraTables + ") haven't gone through full backup. "+
-            "Perform full backup on " + extraTables + " first, "
-            + "then retry the command";
+          String msg =
+              "Some tables (" + extraTables + ") haven't gone through full backup. "
+                  + "Perform full backup on " + extraTables + " first, " + "then retry the command";
           throw new IOException(msg);
         }
       }
@@ -584,14 +584,13 @@ public class BackupAdminImpl implements BackupAdmin {
 
     // update table list
     BackupRequest.Builder builder = new BackupRequest.Builder();
-    request = builder.withBackupType(request.getBackupType()).
-                      withTableList(tableList).
-                      withTargetRootDir(request.getTargetRootDir()).
-                      withBackupSetName(request.getBackupSetName()).
-                      withTotalTasks(request.getTotalTasks()).
-                      withBandwidthPerTasks((int)request.getBandwidth()).build();
-
-    TableBackupClient client =null;
+    request =
+        builder.withBackupType(request.getBackupType()).withTableList(tableList)
+            .withTargetRootDir(request.getTargetRootDir())
+            .withBackupSetName(request.getBackupSetName()).withTotalTasks(request.getTotalTasks())
+            .withBandwidthPerTasks((int) request.getBandwidth()).build();
+
+    TableBackupClient client = null;
     try {
       client = BackupClientFactory.create(conn, backupId, request);
     } catch (IOException e) {
@@ -613,4 +612,132 @@ public class BackupAdminImpl implements BackupAdmin {
     return tableList;
   }
 
+  @Override
+  public void mergeBackups(String[] backupIds) throws IOException {
+    try (final BackupSystemTable sysTable = new BackupSystemTable(conn);) {
+      checkIfValidForMerge(backupIds, sysTable);
+      BackupMergeJob job = BackupRestoreFactory.getBackupMergeJob(conn.getConfiguration());
+      job.run(backupIds);
+    }
+  }
+
+  /**
+   * Verifies that backup images are valid for merge.
+   *
+   * <ul>
+   * <li>All backups MUST be in the same destination
+   * <li>No FULL backups are allowed - only INCREMENTAL
+   * <li>All backups must be in COMPLETE state
+   * <li>No holes in backup list are allowed
+   * </ul>
+   * <p>
+   * @param backupIds list of backup ids
+   * @param table backup system table
+   * @throws IOException
+   */
+  private void checkIfValidForMerge(String[] backupIds, BackupSystemTable table) throws IOException {
+    String backupRoot = null;
+
+    final Set<TableName> allTables = new HashSet<TableName>();
+    final Set<String> allBackups = new HashSet<String>();
+    long minTime = Long.MAX_VALUE, maxTime = Long.MIN_VALUE;
+    for (String backupId : backupIds) {
+      BackupInfo bInfo = table.readBackupInfo(backupId);
+      if (bInfo == null) {
+        String msg = "Backup session " + backupId + " not found";
+        throw new IOException(msg);
+      }
+      if (backupRoot == null) {
+        backupRoot = bInfo.getBackupRootDir();
+      } else if (!bInfo.getBackupRootDir().equals(backupRoot)) {
+        throw new IOException("Found different backup destinations in a list of a backup sessions \n"
+            + "1. " + backupRoot + "\n" + "2. " + bInfo.getBackupRootDir());
+      }
+      if (bInfo.getType() == BackupType.FULL) {
+        throw new IOException("FULL backup image can not be merged for: \n" + bInfo);
+      }
+
+      if (bInfo.getState() != BackupState.COMPLETE) {
+        throw new IOException("Backup image " + backupId
+            + " can not be merged becuase of its state: " + bInfo.getState());
+      }
+      allBackups.add(backupId);
+      allTables.addAll(bInfo.getTableNames());
+      long time = bInfo.getStartTs();
+      if (time < minTime) {
+        minTime = time;
+      }
+      if (time > maxTime) {
+        maxTime = time;
+      }
+    }
+
+
+    final long startRangeTime  = minTime;
+    final long endRangeTime = maxTime;
+    final String backupDest = backupRoot;
+    // Check we have no 'holes' in backup id list
+    // Filter 1 : backupRoot
+    // Filter 2 : time range filter
+    // Filter 3 : table filter
+
+    BackupInfo.Filter destinationFilter = new  BackupInfo.Filter() {
+
+      @Override
+      public boolean apply(BackupInfo info) {
+        return info.getBackupRootDir().equals(backupDest);
+      }
+    };
+
+    BackupInfo.Filter timeRangeFilter = new  BackupInfo.Filter() {
+
+      @Override
+      public boolean apply(BackupInfo info) {
+        long time = info.getStartTs();
+        return time >= startRangeTime && time <= endRangeTime ;
+      }
+    };
+
+    BackupInfo.Filter tableFilter = new  BackupInfo.Filter() {
+
+      @Override
+      public boolean apply(BackupInfo info) {
+        List<TableName> tables = info.getTableNames();
+        return !Collections.disjoint(allTables, tables);
+      }
+    };
+
+    BackupInfo.Filter typeFilter = new  BackupInfo.Filter() {
+
+      @Override
+      public boolean apply(BackupInfo info) {
+        return info.getType() == BackupType.INCREMENTAL;
+      }
+    };
+
+    BackupInfo.Filter stateFilter = new  BackupInfo.Filter() {
+      @Override
+      public boolean apply(BackupInfo info) {
+        return info.getState() == BackupState.COMPLETE;
+      }
+    };
+
+    List<BackupInfo> allInfos =
+        table.getBackupHistory( -1, destinationFilter,
+          timeRangeFilter, tableFilter, typeFilter, stateFilter);
+    if (allInfos.size() != allBackups.size()) {
+      // Yes we have at least one  hole in backup image sequence
+      List<String> missingIds = new ArrayList<String>();
+      for(BackupInfo info: allInfos) {
+        if(allBackups.contains(info.getBackupId())) {
+          continue;
+        }
+        missingIds.add(info.getBackupId());
+      }
+      String errMsg =
+          "Sequence of backup ids has 'holes'. The following backup images must be added:" +
+           org.apache.hadoop.util.StringUtils.join(",", missingIds);
+      throw new IOException(errMsg);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/35aa7aae/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
index aa15fba..650ba2e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
@@ -59,16 +59,15 @@ import org.apache.hadoop.hbase.backup.util.BackupUtils;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-
 import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
 /**
  * General backup commands, options and usage messages
  */
 
 @InterfaceAudience.Private
-public final class BackupCommands  {
+public final class BackupCommands {
 
   public final static String INCORRECT_USAGE = "Incorrect usage";
 
@@ -79,7 +78,8 @@ public final class BackupCommands  {
       + "  history    show history of all successful backups\n"
       + "  progress   show the progress of the latest backup request\n"
       + "  set        backup set management\n"
-      + "  repair     repair backup system table"
+      + "  repair     repair backup system table\n"
+      + "  merge      merge backup images\n"
       + "Run \'hbase backup COMMAND -h\' to see help message for each command\n";
 
   public static final String CREATE_CMD_USAGE =
@@ -109,17 +109,20 @@ public final class BackupCommands  {
 
   public static final String SET_CMD_USAGE = "Usage: hbase backup set COMMAND [name] [tables]\n"
       + "  name            Backup set name\n"
-      + "  tables          Comma separated list of tables.\n"
-      + "COMMAND is one of:\n" + "  add             add tables to a set, create a set if needed\n"
+      + "  tables          Comma separated list of tables.\n" + "COMMAND is one of:\n"
+      + "  add             add tables to a set, create a set if needed\n"
       + "  remove          remove tables from a set\n"
       + "  list            list all backup sets in the system\n"
       + "  describe        describe set\n" + "  delete          delete backup set\n";
+  public static final String MERGE_CMD_USAGE = "Usage: hbase backup merge [backup_ids]\n"
+      + "  backup_ids      Comma separated list of backup image ids.\n";
 
   public static final String USAGE_FOOTER = "";
 
   public static abstract class Command extends Configured {
     CommandLine cmdline;
     Connection conn;
+
     Command(Configuration conf) {
       if (conf == null) {
         conf = HBaseConfiguration.create();
@@ -140,7 +143,7 @@ public final class BackupCommands  {
         try (BackupSystemTable table = new BackupSystemTable(conn);) {
           List<BackupInfo> sessions = table.getBackupInfos(BackupState.RUNNING);
 
-          if(sessions.size() > 0) {
+          if (sessions.size() > 0) {
             System.err.println("Found backup session in a RUNNING state: ");
             System.err.println(sessions.get(0));
             System.err.println("This may indicate that a previous session has failed abnormally.");
@@ -154,11 +157,19 @@ public final class BackupCommands  {
         try (BackupSystemTable table = new BackupSystemTable(conn);) {
           String[] ids = table.getListOfBackupIdsFromDeleteOperation();
 
-          if(ids !=null && ids.length > 0) {
-            System.err.println("Found failed backup delete coommand. ");
+          if (ids != null && ids.length > 0) {
+            System.err.println("Found failed backup DELETE coommand. ");
             System.err.println("Backup system recovery is required.");
-            throw new IOException("Failed backup delete found, aborted command execution");
+            throw new IOException("Failed backup DELETE found, aborted command execution");
           }
+
+          ids = table.getListOfBackupIdsFromMergeOperation();
+          if (ids != null && ids.length > 0) {
+            System.err.println("Found failed backup MERGE coommand. ");
+            System.err.println("Backup system recovery is required.");
+            throw new IOException("Failed backup MERGE found, aborted command execution");
+          }
+
         }
       }
     }
@@ -178,10 +189,10 @@ public final class BackupCommands  {
     protected boolean requiresNoActiveSession() {
       return false;
     }
+
     /**
-     * Command requires consistent state of a backup system
-     * Backup system may become inconsistent because of an abnormal
-     * termination of a backup session or delete command
+     * Command requires consistent state of a backup system Backup system may become inconsistent
+     * because of an abnormal termination of a backup session or delete command
      * @return true, if yes
      */
     protected boolean requiresConsistentState() {
@@ -220,6 +231,9 @@ public final class BackupCommands  {
     case REPAIR:
       cmd = new RepairCommand(conf, cmdline);
       break;
+    case MERGE:
+      cmd = new MergeCommand(conf, cmdline);
+      break;
     case HELP:
     default:
       cmd = new HelpCommand(conf, cmdline);
@@ -257,7 +271,7 @@ public final class BackupCommands  {
         throw new IOException(INCORRECT_USAGE);
       }
       String[] args = cmdline.getArgs();
-      if (args.length !=3) {
+      if (args.length != 3) {
         printUsage();
         throw new IOException(INCORRECT_USAGE);
       }
@@ -274,7 +288,6 @@ public final class BackupCommands  {
         throw new IOException(INCORRECT_USAGE);
       }
 
-
       String tables = null;
 
       // Check if we have both: backup set and list of tables
@@ -310,14 +323,14 @@ public final class BackupCommands  {
 
       try (BackupAdminImpl admin = new BackupAdminImpl(conn);) {
 
-       BackupRequest.Builder builder = new BackupRequest.Builder();
-       BackupRequest request = builder.withBackupType(BackupType.valueOf(args[1].toUpperCase()))
-            .withTableList(tables != null ?
-                          Lists.newArrayList(BackupUtils.parseTableNames(tables)) : null)
-            .withTargetRootDir(args[2])
-            .withTotalTasks(workers)
-            .withBandwidthPerTasks(bandwidth)
-            .withBackupSetName(setName).build();
+        BackupRequest.Builder builder = new BackupRequest.Builder();
+        BackupRequest request =
+            builder
+                .withBackupType(BackupType.valueOf(args[1].toUpperCase()))
+                .withTableList(
+                  tables != null ? Lists.newArrayList(BackupUtils.parseTableNames(tables)) : null)
+                .withTargetRootDir(args[2]).withTotalTasks(workers)
+                .withBandwidthPerTasks(bandwidth).withBackupSetName(setName).build();
         String backupId = admin.backupTables(request);
         System.out.println("Backup session " + backupId + " finished. Status: SUCCESS");
       } catch (IOException e) {
@@ -544,7 +557,8 @@ public final class BackupCommands  {
         int deleted = admin.deleteBackups(backupIds);
         System.out.println("Deleted " + deleted + " backups. Total requested: " + args.length);
       } catch (IOException e) {
-        System.err.println("Delete command FAILED. Please run backup repair tool to restore backup system integrity");
+        System.err
+            .println("Delete command FAILED. Please run backup repair tool to restore backup system integrity");
         throw e;
       }
 
@@ -584,8 +598,9 @@ public final class BackupCommands  {
         if (list.size() == 0) {
           // No failed sessions found
           System.out.println("REPAIR status: no failed sessions found."
-          +" Checking failed delete backup operation ...");
+              + " Checking failed delete backup operation ...");
           repairFailedBackupDeletionIfAny(conn, sysTable);
+          repairFailedBackupMergeIfAny(conn, sysTable);
           return;
         }
         backupInfo = list.get(0);
@@ -606,32 +621,55 @@ public final class BackupCommands  {
         // If backup session is updated to FAILED state - means we
         // processed recovery already.
         sysTable.updateBackupInfo(backupInfo);
-        sysTable.finishBackupSession();
-        System.out.println("REPAIR status: finished repair failed session:\n "+ backupInfo);
+        sysTable.finishBackupExclusiveOperation();
+        System.out.println("REPAIR status: finished repair failed session:\n " + backupInfo);
 
       }
     }
 
     private void repairFailedBackupDeletionIfAny(Connection conn, BackupSystemTable sysTable)
-        throws IOException
-    {
+        throws IOException {
       String[] backupIds = sysTable.getListOfBackupIdsFromDeleteOperation();
-      if (backupIds == null ||backupIds.length == 0) {
-        System.out.println("No failed backup delete operation found");
+      if (backupIds == null || backupIds.length == 0) {
+        System.out.println("No failed backup DELETE operation found");
         // Delete backup table snapshot if exists
         BackupSystemTable.deleteSnapshot(conn);
         return;
       }
-      System.out.println("Found failed delete operation for: " + StringUtils.join(backupIds));
-      System.out.println("Running delete again ...");
+      System.out.println("Found failed DELETE operation for: " + StringUtils.join(backupIds));
+      System.out.println("Running DELETE again ...");
       // Restore table from snapshot
       BackupSystemTable.restoreFromSnapshot(conn);
       // Finish previous failed session
-      sysTable.finishBackupSession();
-      try(BackupAdmin admin = new BackupAdminImpl(conn);) {
+      sysTable.finishBackupExclusiveOperation();
+      try (BackupAdmin admin = new BackupAdminImpl(conn);) {
         admin.deleteBackups(backupIds);
       }
-      System.out.println("Delete operation finished OK: "+ StringUtils.join(backupIds));
+      System.out.println("DELETE operation finished OK: " + StringUtils.join(backupIds));
+
+    }
+
+    private void repairFailedBackupMergeIfAny(Connection conn, BackupSystemTable sysTable)
+        throws IOException {
+      String[] backupIds = sysTable.getListOfBackupIdsFromMergeOperation();
+      if (backupIds == null || backupIds.length == 0) {
+        System.out.println("No failed backup MERGE operation found");
+        // Delete backup table snapshot if exists
+        BackupSystemTable.deleteSnapshot(conn);
+        return;
+      }
+      System.out.println("Found failed MERGE operation for: " + StringUtils.join(backupIds));
+      System.out.println("Running MERGE again ...");
+      // Restore table from snapshot
+      BackupSystemTable.restoreFromSnapshot(conn);
+      // Unlock backupo system
+      sysTable.finishBackupExclusiveOperation();
+      // Finish previous failed session
+      sysTable.finishMergeOperation();
+      try (BackupAdmin admin = new BackupAdminImpl(conn);) {
+        admin.mergeBackups(backupIds);
+      }
+      System.out.println("MERGE operation finished OK: " + StringUtils.join(backupIds));
 
     }
 
@@ -641,6 +679,56 @@ public final class BackupCommands  {
     }
   }
 
+  private static class MergeCommand extends Command {
+
+    MergeCommand(Configuration conf, CommandLine cmdline) {
+      super(conf);
+      this.cmdline = cmdline;
+    }
+
+    @Override
+    protected boolean requiresNoActiveSession() {
+      return true;
+    }
+
+    @Override
+    protected boolean requiresConsistentState() {
+      return true;
+    }
+
+    @Override
+    public void execute() throws IOException {
+      super.execute();
+
+      String[] args = cmdline == null ? null : cmdline.getArgs();
+      if (args == null || (args.length != 2)) {
+        System.err.println("ERROR: wrong number of arguments: "
+            + (args == null ? null : args.length));
+        printUsage();
+        throw new IOException(INCORRECT_USAGE);
+      }
+
+      String[] backupIds = args[1].split(",");
+      if (backupIds.length < 2) {
+        String msg = "ERROR: can not merge a single backup image. "+
+            "Number of images must be greater than 1.";
+        System.err.println(msg);
+        throw new IOException(msg);
+
+      }
+      Configuration conf = getConf() != null ? getConf() : HBaseConfiguration.create();
+      try (final Connection conn = ConnectionFactory.createConnection(conf);
+          final BackupAdminImpl admin = new BackupAdminImpl(conn);) {
+        admin.mergeBackups(backupIds);
+      }
+    }
+
+    @Override
+    protected void printUsage() {
+      System.out.println(MERGE_CMD_USAGE);
+    }
+  }
+
   // TODO Cancel command
 
   private static class CancelCommand extends Command {
@@ -672,7 +760,6 @@ public final class BackupCommands  {
     @Override
     public void execute() throws IOException {
 
-
       int n = parseHistoryLength();
       final TableName tableName = getTableName();
       final String setName = getTableSetName();
@@ -883,7 +970,7 @@ public final class BackupCommands  {
 
     private TableName[] toTableNames(String[] tables) {
       TableName[] arr = new TableName[tables.length];
-      for (int i=0; i < tables.length; i++) {
+      for (int i = 0; i < tables.length; i++) {
         arr[i] = TableName.valueOf(tables[i]);
       }
       return arr;

http://git-wip-us.apache.org/repos/asf/hbase/blob/35aa7aae/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
index bf80506..8fe5eaf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
@@ -115,8 +115,8 @@ public class BackupManager implements Closeable {
     }
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Added log cleaner: " + cleanerClass +"\n" +
-                "Added master procedure manager: " + masterProcedureClass);
+      LOG.debug("Added log cleaner: " + cleanerClass + "\n" + "Added master procedure manager: "
+          + masterProcedureClass);
     }
 
   }
@@ -185,9 +185,8 @@ public class BackupManager implements Closeable {
    * @return BackupInfo
    * @throws BackupException exception
    */
-  public BackupInfo createBackupInfo(String backupId, BackupType type,
-      List<TableName> tableList, String targetRootDir, int workers, long bandwidth)
-      throws BackupException {
+  public BackupInfo createBackupInfo(String backupId, BackupType type, List<TableName> tableList,
+      String targetRootDir, int workers, long bandwidth) throws BackupException {
     if (targetRootDir == null) {
       throw new BackupException("Wrong backup request parameter: target backup root directory");
     }
@@ -313,7 +312,7 @@ public class BackupManager implements Closeable {
           }
         } else {
           Path logBackupPath =
-              HBackupFileSystem.getLogBackupPath(backup.getBackupRootDir(), backup.getBackupId());
+              HBackupFileSystem.getBackupPath(backup.getBackupRootDir(), backup.getBackupId());
           LOG.debug("Current backup has an incremental backup ancestor, "
               + "touching its image manifest in " + logBackupPath.toString()
               + " to construct the dependency.");
@@ -371,7 +370,7 @@ public class BackupManager implements Closeable {
    * @throws IOException if active session already exists
    */
   public void startBackupSession() throws IOException {
-    systemTable.startBackupSession();
+    systemTable.startBackupExclusiveOperation();
   }
 
   /**
@@ -379,10 +378,9 @@ public class BackupManager implements Closeable {
    * @throws IOException if no active session
    */
   public void finishBackupSession() throws IOException {
-    systemTable.finishBackupSession();
+    systemTable.finishBackupExclusiveOperation();
   }
 
-
   /**
    * Read the last backup start code (timestamp) of last successful backup. Will return null if
    * there is no startcode stored in backup system table or the value is of length 0. These two
@@ -413,7 +411,7 @@ public class BackupManager implements Closeable {
   }
 
   public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>>
-  readBulkloadRows(List<TableName> tableList) throws IOException {
+      readBulkloadRows(List<TableName> tableList) throws IOException {
     return systemTable.readBulkloadRows(tableList);
   }
 
@@ -448,8 +446,7 @@ public class BackupManager implements Closeable {
    */
   public void writeRegionServerLogTimestamp(Set<TableName> tables,
       HashMap<String, Long> newTimestamps) throws IOException {
-    systemTable.writeRegionServerLogTimestamp(tables, newTimestamps,
-      backupInfo.getBackupRootDir());
+    systemTable.writeRegionServerLogTimestamp(tables, newTimestamps, backupInfo.getBackupRootDir());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/35aa7aae/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java
index b8adac9..7e3201e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.backup.BackupInfo;
 import org.apache.hadoop.hbase.backup.BackupType;
+import org.apache.hadoop.hbase.backup.HBackupFileSystem;
 import org.apache.hadoop.hbase.backup.util.BackupUtils;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
@@ -61,9 +62,8 @@ public class BackupManifest {
   public static final String MANIFEST_FILE_NAME = ".backup.manifest";
 
   /**
-   *  Backup image, the dependency graph is made up by series of backup images
-   *  BackupImage contains all the relevant information to restore the backup and
-   *  is used during restore operation
+   * Backup image, the dependency graph is made up by series of backup images BackupImage contains
+   * all the relevant information to restore the backup and is used during restore operation
    */
 
   public static class BackupImage implements Comparable<BackupImage> {
@@ -294,6 +294,16 @@ public class BackupManifest {
       return this.ancestors;
     }
 
+    public void removeAncestors(List<String> backupIds) {
+      List<BackupImage> toRemove = new ArrayList<BackupImage>();
+      for (BackupImage im : this.ancestors) {
+        if (backupIds.contains(im.getBackupId())) {
+          toRemove.add(im);
+        }
+      }
+      this.ancestors.removeAll(toRemove);
+    }
+
     private void addAncestor(BackupImage backupImage) {
       this.getAncestors().add(backupImage);
     }
@@ -464,18 +474,16 @@ public class BackupManifest {
   }
 
   /**
-   * Persist the manifest file.
+   * TODO: fix it. Persist the manifest file.
    * @throws IOException IOException when storing the manifest file.
    */
 
   public void store(Configuration conf) throws BackupException {
     byte[] data = backupImage.toProto().toByteArray();
     // write the file, overwrite if already exist
-    String logBackupDir =
-        BackupUtils.getLogBackupDir(backupImage.getRootDir(), backupImage.getBackupId());
     Path manifestFilePath =
-        new Path(new Path((tableBackupDir != null ? tableBackupDir : logBackupDir)),
-            MANIFEST_FILE_NAME);
+        new Path(HBackupFileSystem.getBackupPath(backupImage.getRootDir(),
+          backupImage.getBackupId()), MANIFEST_FILE_NAME);
     try (FSDataOutputStream out =
         manifestFilePath.getFileSystem(conf).create(manifestFilePath, true);) {
       out.write(data);

http://git-wip-us.apache.org/repos/asf/hbase/blob/35aa7aae/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
index e5a3daa..4dab046 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
@@ -82,6 +82,7 @@ import org.apache.hadoop.hbase.util.Pair;
  * value = backupId and full WAL file name</li>
  * </ul></p>
  */
+
 @InterfaceAudience.Private
 public final class BackupSystemTable implements Closeable {
   private static final Log LOG = LogFactory.getLog(BackupSystemTable.class);
@@ -118,7 +119,7 @@ public final class BackupSystemTable implements Closeable {
 
   private TableName tableName;
   /**
-   *  Stores backup sessions (contexts)
+   * Stores backup sessions (contexts)
    */
   final static byte[] SESSIONS_FAMILY = "session".getBytes();
   /**
@@ -127,11 +128,10 @@ public final class BackupSystemTable implements Closeable {
   final static byte[] META_FAMILY = "meta".getBytes();
   final static byte[] BULK_LOAD_FAMILY = "bulk".getBytes();
   /**
-   *  Connection to HBase cluster, shared among all instances
+   * Connection to HBase cluster, shared among all instances
    */
   private final Connection connection;
 
-
   private final static String BACKUP_INFO_PREFIX = "session:";
   private final static String START_CODE_ROW = "startcode:";
   private final static byte[] ACTIVE_SESSION_ROW = "activesession:".getBytes();
@@ -147,6 +147,7 @@ public final class BackupSystemTable implements Closeable {
   private final static String BULK_LOAD_PREFIX = "bulk:";
   private final static byte[] BULK_LOAD_PREFIX_BYTES = BULK_LOAD_PREFIX.getBytes();
   private final static byte[] DELETE_OP_ROW = "delete_op_row".getBytes();
+  private final static byte[] MERGE_OP_ROW = "merge_op_row".getBytes();
 
   final static byte[] TBL_COL = Bytes.toBytes("tbl");
   final static byte[] FAM_COL = Bytes.toBytes("fam");
@@ -160,7 +161,7 @@ public final class BackupSystemTable implements Closeable {
   private final static String SET_KEY_PREFIX = "backupset:";
 
   // separator between BULK_LOAD_PREFIX and ordinals
- protected final static String BLK_LD_DELIM = ":";
+  protected final static String BLK_LD_DELIM = ":";
   private final static byte[] EMPTY_VALUE = new byte[] {};
 
   // Safe delimiter in a string
@@ -187,19 +188,19 @@ public final class BackupSystemTable implements Closeable {
   }
 
   private void verifyNamespaceExists(Admin admin) throws IOException {
-      String namespaceName  = tableName.getNamespaceAsString();
-      NamespaceDescriptor ns = NamespaceDescriptor.create(namespaceName).build();
-      NamespaceDescriptor[] list = admin.listNamespaceDescriptors();
-      boolean exists = false;
-      for( NamespaceDescriptor nsd: list) {
-        if (nsd.getName().equals(ns.getName())) {
-          exists = true;
-          break;
-        }
-      }
-      if (!exists) {
-        admin.createNamespace(ns);
+    String namespaceName = tableName.getNamespaceAsString();
+    NamespaceDescriptor ns = NamespaceDescriptor.create(namespaceName).build();
+    NamespaceDescriptor[] list = admin.listNamespaceDescriptors();
+    boolean exists = false;
+    for (NamespaceDescriptor nsd : list) {
+      if (nsd.getName().equals(ns.getName())) {
+        exists = true;
+        break;
       }
+    }
+    if (!exists) {
+      admin.createNamespace(ns);
+    }
   }
 
   private void waitForSystemTable(Admin admin) throws IOException {
@@ -211,15 +212,13 @@ public final class BackupSystemTable implements Closeable {
       } catch (InterruptedException e) {
       }
       if (EnvironmentEdgeManager.currentTime() - startTime > TIMEOUT) {
-        throw new IOException("Failed to create backup system table after "+ TIMEOUT+"ms");
+        throw new IOException("Failed to create backup system table after " + TIMEOUT + "ms");
       }
     }
     LOG.debug("Backup table exists and available");
 
   }
 
-
-
   @Override
   public void close() {
     // do nothing
@@ -257,7 +256,7 @@ public final class BackupSystemTable implements Closeable {
         byte[] row = CellUtil.cloneRow(res.listCells().get(0));
         for (Cell cell : res.listCells()) {
           if (CellComparator.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0,
-              BackupSystemTable.PATH_COL.length) == 0) {
+            BackupSystemTable.PATH_COL.length) == 0) {
             map.put(row, Bytes.toString(CellUtil.cloneValue(cell)));
           }
         }
@@ -286,13 +285,13 @@ public final class BackupSystemTable implements Closeable {
         String path = null;
         for (Cell cell : res.listCells()) {
           if (CellComparator.compareQualifiers(cell, BackupSystemTable.TBL_COL, 0,
-              BackupSystemTable.TBL_COL.length) == 0) {
+            BackupSystemTable.TBL_COL.length) == 0) {
             tbl = TableName.valueOf(CellUtil.cloneValue(cell));
           } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0,
-              BackupSystemTable.FAM_COL.length) == 0) {
+            BackupSystemTable.FAM_COL.length) == 0) {
             fam = CellUtil.cloneValue(cell);
           } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0,
-              BackupSystemTable.PATH_COL.length) == 0) {
+            BackupSystemTable.PATH_COL.length) == 0) {
             path = Bytes.toString(CellUtil.cloneValue(cell));
           }
         }
@@ -313,9 +312,10 @@ public final class BackupSystemTable implements Closeable {
         }
         files.add(new Path(path));
         if (LOG.isDebugEnabled()) {
-          LOG.debug("found bulk loaded file : " + tbl + " " +  Bytes.toString(fam) + " " + path);
+          LOG.debug("found bulk loaded file : " + tbl + " " + Bytes.toString(fam) + " " + path);
         }
-      };
+      }
+      ;
       return mapForSrc;
     }
   }
@@ -359,16 +359,16 @@ public final class BackupSystemTable implements Closeable {
   public void writePathsPostBulkLoad(TableName tabName, byte[] region,
       Map<byte[], List<Path>> finalPaths) throws IOException {
     if (LOG.isDebugEnabled()) {
-      LOG.debug("write bulk load descriptor to backup " + tabName + " with " +
-          finalPaths.size() + " entries");
+      LOG.debug("write bulk load descriptor to backup " + tabName + " with " + finalPaths.size()
+          + " entries");
     }
     try (Table table = connection.getTable(tableName)) {
-      List<Put> puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region,
-          finalPaths);
+      List<Put> puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region, finalPaths);
       table.put(puts);
       LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName);
     }
   }
+
   /*
    * For preCommitStoreFile() hook
    * @param tabName table name
@@ -376,15 +376,15 @@ public final class BackupSystemTable implements Closeable {
    * @param family column family
    * @param pairs list of paths for hfiles
    */
-  public void writeFilesForBulkLoadPreCommit(TableName tabName, byte[] region,
-      final byte[] family, final List<Pair<Path, Path>> pairs) throws IOException {
+  public void writeFilesForBulkLoadPreCommit(TableName tabName, byte[] region, final byte[] family,
+      final List<Pair<Path, Path>> pairs) throws IOException {
     if (LOG.isDebugEnabled()) {
-      LOG.debug("write bulk load descriptor to backup " + tabName + " with " +
-          pairs.size() + " entries");
+      LOG.debug("write bulk load descriptor to backup " + tabName + " with " + pairs.size()
+          + " entries");
     }
     try (Table table = connection.getTable(tableName)) {
-      List<Put> puts = BackupSystemTable.createPutForPreparedBulkload(tabName, region,
-          family, pairs);
+      List<Put> puts =
+          BackupSystemTable.createPutForPreparedBulkload(tabName, region, family, pairs);
       table.put(puts);
       LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName);
     }
@@ -411,11 +411,11 @@ public final class BackupSystemTable implements Closeable {
   /*
    * Reads the rows from backup table recording bulk loaded hfiles
    * @param tableList list of table names
-   * @return The keys of the Map are table, region and column family.
-   *  Value of the map reflects whether the hfile was recorded by preCommitStoreFile hook (true)
+   * @return The keys of the Map are table, region and column family. Value of the map reflects
+   * whether the hfile was recorded by preCommitStoreFile hook (true)
    */
   public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>>
-  readBulkloadRows(List<TableName> tableList) throws IOException {
+      readBulkloadRows(List<TableName> tableList) throws IOException {
     Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = new HashMap<>();
     List<byte[]> rows = new ArrayList<>();
     for (TableName tTable : tableList) {
@@ -437,13 +437,13 @@ public final class BackupSystemTable implements Closeable {
             String rowStr = Bytes.toString(row);
             region = BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr);
             if (CellComparator.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0,
-                BackupSystemTable.FAM_COL.length) == 0) {
+              BackupSystemTable.FAM_COL.length) == 0) {
               fam = Bytes.toString(CellUtil.cloneValue(cell));
             } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0,
-                BackupSystemTable.PATH_COL.length) == 0) {
+              BackupSystemTable.PATH_COL.length) == 0) {
               path = Bytes.toString(CellUtil.cloneValue(cell));
             } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.STATE_COL, 0,
-                BackupSystemTable.STATE_COL.length) == 0) {
+              BackupSystemTable.STATE_COL.length) == 0) {
               byte[] state = CellUtil.cloneValue(cell);
               if (Bytes.equals(BackupSystemTable.BL_PREPARE, state)) {
                 raw = true;
@@ -484,12 +484,13 @@ public final class BackupSystemTable implements Closeable {
         Map<byte[], List<Path>> map = maps[idx];
         TableName tn = sTableList.get(idx);
         if (map == null) continue;
-        for (Map.Entry<byte[], List<Path>> entry: map.entrySet()) {
+        for (Map.Entry<byte[], List<Path>> entry : map.entrySet()) {
           byte[] fam = entry.getKey();
           List<Path> paths = entry.getValue();
           for (Path p : paths) {
-            Put put = BackupSystemTable.createPutForBulkLoadedFile(tn, fam, p.toString(),
-                backupId, ts, cnt++);
+            Put put =
+                BackupSystemTable.createPutForBulkLoadedFile(tn, fam, p.toString(), backupId, ts,
+                  cnt++);
             puts.add(put);
           }
         }
@@ -564,18 +565,23 @@ public final class BackupSystemTable implements Closeable {
     }
   }
 
-  public void startBackupSession() throws IOException {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Start new backup session");
+  /**
+   * Exclusive operations are:
+   * create, delete, merge
+   * @throws IOException
+   */
+  public void startBackupExclusiveOperation() throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Start new backup exclusive operation");
     }
     try (Table table = connection.getTable(tableName)) {
       Put put = createPutForStartBackupSession();
-      //First try to put if row does not exist
+      // First try to put if row does not exist
       if (!table.checkAndPut(ACTIVE_SESSION_ROW, SESSIONS_FAMILY, ACTIVE_SESSION_COL, null, put)) {
         // Row exists, try to put if value == ACTIVE_SESSION_NO
         if (!table.checkAndPut(ACTIVE_SESSION_ROW, SESSIONS_FAMILY, ACTIVE_SESSION_COL,
           ACTIVE_SESSION_NO, put)) {
-          throw new IOException("There is an active backup session");
+          throw new IOException("There is an active backup exclusive operation");
         }
       }
     }
@@ -587,17 +593,15 @@ public final class BackupSystemTable implements Closeable {
     return put;
   }
 
-  public void finishBackupSession() throws IOException
-  {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Stop backup session");
+  public void finishBackupExclusiveOperation() throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Finish backup exclusive operation");
     }
     try (Table table = connection.getTable(tableName)) {
       Put put = createPutForStopBackupSession();
-      if(!table.checkAndPut(ACTIVE_SESSION_ROW, SESSIONS_FAMILY, ACTIVE_SESSION_COL,
-        ACTIVE_SESSION_YES, put))
-      {
-        throw new IOException("There is no active backup session");
+      if (!table.checkAndPut(ACTIVE_SESSION_ROW, SESSIONS_FAMILY, ACTIVE_SESSION_COL,
+        ACTIVE_SESSION_YES, put)) {
+        throw new IOException("There is no active backup exclusive operation");
       }
     }
   }
@@ -630,8 +634,7 @@ public final class BackupSystemTable implements Closeable {
         res.advance();
         Cell cell = res.current();
         byte[] row = CellUtil.cloneRow(cell);
-        String server =
-            getServerNameForReadRegionServerLastLogRollResult(row);
+        String server = getServerNameForReadRegionServerLastLogRollResult(row);
         byte[] data = CellUtil.cloneValue(cell);
         rsTimestampMap.put(server, Bytes.toLong(data));
       }
@@ -652,8 +655,7 @@ public final class BackupSystemTable implements Closeable {
       LOG.trace("write region server last roll log result to backup system table");
     }
     try (Table table = connection.getTable(tableName)) {
-      Put put =
-          createPutForRegionServerLastLogRollResult(server, ts, backupRoot);
+      Put put = createPutForRegionServerLastLogRollResult(server, ts, backupRoot);
       table.put(put);
     }
   }
@@ -685,14 +687,15 @@ public final class BackupSystemTable implements Closeable {
 
   /**
    * Get first n backup history records
-   * @param n number of records
+   * @param n number of records, if n== -1 - max number
+   *        is ignored
    * @return list of records
    * @throws IOException
    */
   public List<BackupInfo> getHistory(int n) throws IOException {
 
     List<BackupInfo> history = getBackupHistory();
-    if (history.size() <= n) return history;
+    if (n == -1 || history.size() <= n) return history;
     List<BackupInfo> list = new ArrayList<BackupInfo>();
     for (int i = 0; i < n; i++) {
       list.add(history.get(i));
@@ -703,7 +706,8 @@ public final class BackupSystemTable implements Closeable {
 
   /**
    * Get backup history records filtered by list of filters.
-   * @param n max number of records
+   * @param n max number of records, if n == -1 , then max number
+   *        is ignored
    * @param filters list of filters
    * @return backup records
    * @throws IOException
@@ -714,7 +718,7 @@ public final class BackupSystemTable implements Closeable {
     List<BackupInfo> history = getBackupHistory();
     List<BackupInfo> result = new ArrayList<BackupInfo>();
     for (BackupInfo bi : history) {
-      if (result.size() == n) break;
+      if (n >= 0 && result.size() == n) break;
       boolean passed = true;
       for (int i = 0; i < filters.length; i++) {
         if (!filters[i].apply(bi)) {
@@ -852,9 +856,7 @@ public final class BackupSystemTable implements Closeable {
     List<Put> puts = new ArrayList<Put>();
     for (TableName table : tables) {
       byte[] smapData = toTableServerTimestampProto(table, newTimestamps).toByteArray();
-      Put put =
-          createPutForWriteRegionServerLogTimestamp(table, smapData,
-            backupRoot);
+      Put put = createPutForWriteRegionServerLogTimestamp(table, smapData, backupRoot);
       puts.add(put);
     }
     try (Table table = connection.getTable(tableName)) {
@@ -1018,8 +1020,7 @@ public final class BackupSystemTable implements Closeable {
       }
     }
     try (Table table = connection.getTable(tableName)) {
-      List<Put> puts =
-          createPutsForAddWALFiles(files, backupId, backupRoot);
+      List<Put> puts = createPutsForAddWALFiles(files, backupId, backupRoot);
       table.put(puts);
     }
   }
@@ -1087,6 +1088,7 @@ public final class BackupSystemTable implements Closeable {
    * @param file name of a file to check
    * @return true, if deletable, false otherwise.
    * @throws IOException exception
+   * TODO: multiple backup destination support
    */
   public boolean isWALFileDeletable(String file) throws IOException {
     if (LOG.isTraceEnabled()) {
@@ -1271,12 +1273,12 @@ public final class BackupSystemTable implements Closeable {
       if (disjoint.length > 0 && disjoint.length != tables.length) {
         Put put = createPutForBackupSet(name, disjoint);
         table.put(put);
-      } else if(disjoint.length == tables.length) {
+      } else if (disjoint.length == tables.length) {
         LOG.warn("Backup set '" + name + "' does not contain tables ["
             + StringUtils.join(toRemove, " ") + "]");
       } else { // disjoint.length == 0 and tables.length >0
-        // Delete  backup set
-        LOG.info("Backup set '"+name+"' is empty. Deleting.");
+        // Delete backup set
+        LOG.info("Backup set '" + name + "' is empty. Deleting.");
         deleteBackupSet(name);
       }
     } finally {
@@ -1356,7 +1358,7 @@ public final class BackupSystemTable implements Closeable {
   }
 
   public static String getSnapshotName(Configuration conf) {
-    return "snapshot_"+getTableNameAsString(conf).replace(":", "_");
+    return "snapshot_" + getTableNameAsString(conf).replace(":", "_");
   }
 
   /**
@@ -1589,17 +1591,16 @@ public final class BackupSystemTable implements Closeable {
       for (Path path : entry.getValue()) {
         String file = path.toString();
         int lastSlash = file.lastIndexOf("/");
-        String filename = file.substring(lastSlash+1);
-        Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM,
-            Bytes.toString(region), BLK_LD_DELIM, filename));
+        String filename = file.substring(lastSlash + 1);
+        Put put =
+            new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM,
+              Bytes.toString(region), BLK_LD_DELIM, filename));
         put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName());
         put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, entry.getKey());
-        put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL,
-            file.getBytes());
+        put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, file.getBytes());
         put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_COMMIT);
         puts.add(put);
-        LOG.debug("writing done bulk path " + file + " for " + table + " " +
-            Bytes.toString(region));
+        LOG.debug("writing done bulk path " + file + " for " + table + " " + Bytes.toString(region));
       }
     }
     return puts;
@@ -1607,19 +1608,16 @@ public final class BackupSystemTable implements Closeable {
 
   public static void snapshot(Connection conn) throws IOException {
 
-    try (Admin admin = conn.getAdmin();){
+    try (Admin admin = conn.getAdmin();) {
       Configuration conf = conn.getConfiguration();
-      admin.snapshot(BackupSystemTable.getSnapshotName(conf),
-        BackupSystemTable.getTableName(conf));
+      admin.snapshot(BackupSystemTable.getSnapshotName(conf), BackupSystemTable.getTableName(conf));
     }
   }
 
-  public static void restoreFromSnapshot(Connection conn)
-      throws IOException {
+  public static void restoreFromSnapshot(Connection conn) throws IOException {
 
     Configuration conf = conn.getConfiguration();
-    LOG.debug("Restoring " + BackupSystemTable.getTableNameAsString(conf) +
-        " from snapshot");
+    LOG.debug("Restoring " + BackupSystemTable.getTableNameAsString(conf) + " from snapshot");
     try (Admin admin = conn.getAdmin();) {
       String snapshotName = BackupSystemTable.getSnapshotName(conf);
       if (snapshotExists(admin, snapshotName)) {
@@ -1631,8 +1629,8 @@ public final class BackupSystemTable implements Closeable {
         // Snapshot does not exists, i.e completeBackup failed after
         // deleting backup system table snapshot
         // In this case we log WARN and proceed
-        LOG.warn("Could not restore backup system table. Snapshot " + snapshotName+
-          " does not exists.");
+        LOG.warn("Could not restore backup system table. Snapshot " + snapshotName
+            + " does not exists.");
       }
     }
   }
@@ -1640,7 +1638,7 @@ public final class BackupSystemTable implements Closeable {
   protected static boolean snapshotExists(Admin admin, String snapshotName) throws IOException {
 
     List<SnapshotDescription> list = admin.listSnapshots();
-    for (SnapshotDescription desc: list) {
+    for (SnapshotDescription desc : list) {
       if (desc.getName().equals(snapshotName)) {
         return true;
       }
@@ -1648,26 +1646,25 @@ public final class BackupSystemTable implements Closeable {
     return false;
   }
 
-  public static boolean snapshotExists (Connection conn) throws IOException {
+  public static boolean snapshotExists(Connection conn) throws IOException {
     return snapshotExists(conn.getAdmin(), getSnapshotName(conn.getConfiguration()));
   }
 
-  public static void deleteSnapshot(Connection conn)
-      throws IOException {
+  public static void deleteSnapshot(Connection conn) throws IOException {
 
     Configuration conf = conn.getConfiguration();
-    LOG.debug("Deleting " + BackupSystemTable.getSnapshotName(conf) +
-        " from the system");
+    LOG.debug("Deleting " + BackupSystemTable.getSnapshotName(conf) + " from the system");
     try (Admin admin = conn.getAdmin();) {
       String snapshotName = BackupSystemTable.getSnapshotName(conf);
       if (snapshotExists(admin, snapshotName)) {
         admin.deleteSnapshot(snapshotName);
         LOG.debug("Done deleting backup system table snapshot");
       } else {
-        LOG.error("Snapshot "+snapshotName+" does not exists");
+        LOG.error("Snapshot " + snapshotName + " does not exists");
       }
     }
   }
+
   /*
    * Creates Put's for bulk load resulting from running LoadIncrementalHFiles
    */
@@ -1678,17 +1675,16 @@ public final class BackupSystemTable implements Closeable {
       Path path = pair.getSecond();
       String file = path.toString();
       int lastSlash = file.lastIndexOf("/");
-      String filename = file.substring(lastSlash+1);
-      Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM,
-          Bytes.toString(region), BLK_LD_DELIM, filename));
+      String filename = file.substring(lastSlash + 1);
+      Put put =
+          new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM, Bytes.toString(region),
+            BLK_LD_DELIM, filename));
       put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName());
       put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, family);
-      put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL,
-          file.getBytes());
+      put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, file.getBytes());
       put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_PREPARE);
       puts.add(put);
-      LOG.debug("writing raw bulk path " + file + " for " + table + " " +
-          Bytes.toString(region));
+      LOG.debug("writing raw bulk path " + file + " for " + table + " " + Bytes.toString(region));
     }
     return puts;
   }
@@ -1725,7 +1721,6 @@ public final class BackupSystemTable implements Closeable {
     return get;
   }
 
-
   public void startDeleteOperation(String[] backupIdList) throws IOException {
     if (LOG.isTraceEnabled()) {
       LOG.trace("Start delete operation for backups: " + StringUtils.join(backupIdList));
@@ -1765,6 +1760,96 @@ public final class BackupSystemTable implements Closeable {
     }
   }
 
+  private Put createPutForMergeOperation(String[] backupIdList) {
+
+    byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ","));
+    Put put = new Put(MERGE_OP_ROW);
+    put.addColumn(META_FAMILY, FAM_COL, value);
+    return put;
+  }
+
+  public boolean isMergeInProgress() throws IOException {
+    Get get = new Get(MERGE_OP_ROW);
+    try (Table table = connection.getTable(tableName)) {
+      Result res = table.get(get);
+      if (res.isEmpty()) {
+        return false;
+      }
+      return true;
+    }
+  }
+
+  private Put createPutForUpdateTablesForMerge(List<TableName> tables) {
+
+    byte[] value = Bytes.toBytes(StringUtils.join(tables, ","));
+    Put put = new Put(MERGE_OP_ROW);
+    put.addColumn(META_FAMILY, PATH_COL, value);
+    return put;
+  }
+
+  private Delete createDeleteForBackupMergeOperation() {
+
+    Delete delete = new Delete(MERGE_OP_ROW);
+    delete.addFamily(META_FAMILY);
+    return delete;
+  }
+
+  private Get createGetForMergeOperation() {
+
+    Get get = new Get(MERGE_OP_ROW);
+    get.addFamily(META_FAMILY);
+    return get;
+  }
+
+  public void startMergeOperation(String[] backupIdList) throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Start merge operation for backups: " + StringUtils.join(backupIdList));
+    }
+    Put put = createPutForMergeOperation(backupIdList);
+    try (Table table = connection.getTable(tableName)) {
+      table.put(put);
+    }
+  }
+
+  public void updateProcessedTablesForMerge(List<TableName> tables) throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Update tables for merge : " + StringUtils.join(tables, ","));
+    }
+    Put put = createPutForUpdateTablesForMerge(tables);
+    try (Table table = connection.getTable(tableName)) {
+      table.put(put);
+    }
+  }
+
+  public void finishMergeOperation() throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Finsih merge operation for backup ids ");
+    }
+    Delete delete = createDeleteForBackupMergeOperation();
+    try (Table table = connection.getTable(tableName)) {
+      table.delete(delete);
+    }
+  }
+
+  public String[] getListOfBackupIdsFromMergeOperation() throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Get backup ids for merge operation");
+    }
+    Get get = createGetForMergeOperation();
+    try (Table table = connection.getTable(tableName)) {
+      Result res = table.get(get);
+      if (res.isEmpty()) {
+        return null;
+      }
+      Cell cell = res.listCells().get(0);
+      byte[] val = CellUtil.cloneValue(cell);
+      if (val.length == 0) {
+        return null;
+      }
+      return new String(val).split(",");
+    }
+  }
+
   static Scan createScanForOrigBulkLoadedFiles(TableName table) throws IOException {
     Scan scan = new Scan();
     byte[] startRow = rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM);
@@ -1776,10 +1861,12 @@ public final class BackupSystemTable implements Closeable {
     scan.setMaxVersions(1);
     return scan;
   }
+
   static String getTableNameFromOrigBulkLoadRow(String rowStr) {
     String[] parts = rowStr.split(BLK_LD_DELIM);
     return parts[1];
   }
+
   static String getRegionNameFromOrigBulkLoadRow(String rowStr) {
     // format is bulk : namespace : table : region : file
     String[] parts = rowStr.split(BLK_LD_DELIM);
@@ -1791,6 +1878,7 @@ public final class BackupSystemTable implements Closeable {
     LOG.debug("bulk row string " + rowStr + " region " + parts[idx]);
     return parts[idx];
   }
+
   /*
    * Used to query bulk loaded hfiles which have been copied by incremental backup
    * @param backupId the backup Id. It can be null when querying for all tables
@@ -1798,13 +1886,14 @@ public final class BackupSystemTable implements Closeable {
    */
   static Scan createScanForBulkLoadedFiles(String backupId) throws IOException {
     Scan scan = new Scan();
-    byte[] startRow = backupId == null ? BULK_LOAD_PREFIX_BYTES :
-      rowkey(BULK_LOAD_PREFIX, backupId+BLK_LD_DELIM);
+    byte[] startRow =
+        backupId == null ? BULK_LOAD_PREFIX_BYTES : rowkey(BULK_LOAD_PREFIX, backupId
+            + BLK_LD_DELIM);
     byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
     stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
     scan.setStartRow(startRow);
     scan.setStopRow(stopRow);
-    //scan.setTimeRange(lower, Long.MAX_VALUE);
+    // scan.setTimeRange(lower, Long.MAX_VALUE);
     scan.addFamily(BackupSystemTable.META_FAMILY);
     scan.setMaxVersions(1);
     return scan;
@@ -1812,12 +1901,13 @@ public final class BackupSystemTable implements Closeable {
 
   static Put createPutForBulkLoadedFile(TableName tn, byte[] fam, String p, String backupId,
       long ts, int idx) {
-    Put put = new Put(rowkey(BULK_LOAD_PREFIX, backupId+BLK_LD_DELIM+ts+BLK_LD_DELIM+idx));
+    Put put = new Put(rowkey(BULK_LOAD_PREFIX, backupId + BLK_LD_DELIM + ts + BLK_LD_DELIM + idx));
     put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, tn.getName());
     put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, fam);
     put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, p.getBytes());
     return put;
   }
+
   /**
    * Creates put list for list of WAL files
    * @param files list of WAL file paths
@@ -1825,8 +1915,9 @@ public final class BackupSystemTable implements Closeable {
    * @return put list
    * @throws IOException exception
    */
-  private List<Put> createPutsForAddWALFiles(List<String> files, String backupId,
-      String backupRoot) throws IOException {
+  private List<Put>
+      createPutsForAddWALFiles(List<String> files, String backupId, String backupRoot)
+          throws IOException {
 
     List<Put> puts = new ArrayList<Put>();
     for (String file : files) {
@@ -1957,5 +2048,4 @@ public final class BackupSystemTable implements Closeable {
     return sb.toString().getBytes();
   }
 
-
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/35aa7aae/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
index 381e9b1..ea7a7b8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
@@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.backup.BackupType;
 import org.apache.hadoop.hbase.backup.HBackupFileSystem;
 import org.apache.hadoop.hbase.backup.RestoreRequest;
 import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
-import org.apache.hadoop.hbase.backup.mapreduce.MapReduceRestoreJob;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
 import org.apache.hadoop.hbase.backup.util.RestoreTool;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Admin;
@@ -58,7 +58,6 @@ public class RestoreTablesClient {
   private Configuration conf;
   private Connection conn;
   private String backupId;
-  private String fullBackupId;
   private TableName[] sTableArray;
   private TableName[] tTableArray;
   private String targetRootDir;
@@ -107,8 +106,7 @@ public class RestoreTablesClient {
 
     if (existTableList.size() > 0) {
       if (!isOverwrite) {
-        LOG.error("Existing table ("
-            + existTableList
+        LOG.error("Existing table (" + existTableList
             + ") found in the restore target, please add "
             + "\"-overwrite\" option in the command if you mean"
             + " to restore to these existing tables");
@@ -148,9 +146,8 @@ public class RestoreTablesClient {
     Path tableBackupPath = HBackupFileSystem.getTableBackupPath(sTable, backupRoot, backupId);
     String lastIncrBackupId = images.length == 1 ? null : images[images.length - 1].getBackupId();
     // We need hFS only for full restore (see the code)
-    BackupManifest manifest = HBackupFileSystem.getManifest(sTable, conf, backupRoot, backupId);
+    BackupManifest manifest = HBackupFileSystem.getManifest(conf, backupRoot, backupId);
     if (manifest.getType() == BackupType.FULL) {
-      fullBackupId = manifest.getBackupImage().getBackupId();
       LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from full" + " backup image "
           + tableBackupPath.toString());
       restoreTool.fullRestoreTable(conn, tableBackupPath, sTable, tTable, truncateIfExists,
@@ -169,8 +166,8 @@ public class RestoreTablesClient {
     // full backup path comes first
     for (int i = 1; i < images.length; i++) {
       BackupImage im = images[i];
-      String fileBackupDir = HBackupFileSystem.getTableBackupDir(im.getRootDir(),
-                  im.getBackupId(), sTable)+ Path.SEPARATOR+"data";
+      String fileBackupDir =
+          HBackupFileSystem.getTableBackupDataDir(im.getRootDir(), im.getBackupId(), sTable);
       dirList.add(new Path(fileBackupDir));
     }
 
@@ -196,8 +193,10 @@ public class RestoreTablesClient {
     TreeSet<BackupImage> restoreImageSet = new TreeSet<BackupImage>();
     boolean truncateIfExists = isOverwrite;
     Set<String> backupIdSet = new HashSet<>();
+
     for (int i = 0; i < sTableArray.length; i++) {
       TableName table = sTableArray[i];
+
       BackupManifest manifest = backupManifestMap.get(table);
       // Get the image list of this backup for restore in time order from old
       // to new.
@@ -213,11 +212,8 @@ public class RestoreTablesClient {
       if (restoreImageSet != null && !restoreImageSet.isEmpty()) {
         LOG.info("Restore includes the following image(s):");
         for (BackupImage image : restoreImageSet) {
-          LOG.info("Backup: "
-              + image.getBackupId()
-              + " "
-              + HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(),
-                  table));
+          LOG.info("Backup: " + image.getBackupId() + " "
+              + HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(), table));
           if (image.getType() == BackupType.INCREMENTAL) {
             backupIdSet.add(image.getBackupId());
             LOG.debug("adding " + image.getBackupId() + " for bulk load");
@@ -232,13 +228,13 @@ public class RestoreTablesClient {
         Map<byte[], List<Path>>[] mapForSrc = table.readBulkLoadedFiles(id, sTableList);
         Map<LoadQueueItem, ByteBuffer> loaderResult;
         conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true);
-        LoadIncrementalHFiles loader = MapReduceRestoreJob.createLoader(conf);
+        LoadIncrementalHFiles loader = BackupUtils.createLoader(conf);
         for (int i = 0; i < sTableList.size(); i++) {
           if (mapForSrc[i] != null && !mapForSrc[i].isEmpty()) {
             loaderResult = loader.run(null, mapForSrc[i], tTableArray[i]);
             LOG.debug("bulk loading " + sTableList.get(i) + " to " + tTableArray[i]);
             if (loaderResult.isEmpty()) {
-              String msg = "Couldn't bulk load for " + sTableList.get(i) + " to " +tTableArray[i];
+              String msg = "Couldn't bulk load for " + sTableList.get(i) + " to " + tTableArray[i];
               LOG.error(msg);
               throw new IOException(msg);
             }
@@ -253,7 +249,7 @@ public class RestoreTablesClient {
     if (backupId == null) {
       return 0;
     }
-    return Long.parseLong(backupId.substring(backupId.lastIndexOf("_")+1));
+    return Long.parseLong(backupId.substring(backupId.lastIndexOf("_") + 1));
   }
 
   static boolean withinRange(long a, long lower, long upper) {
@@ -268,15 +264,15 @@ public class RestoreTablesClient {
     // case VALIDATION:
     // check the target tables
     checkTargetTables(tTableArray, isOverwrite);
+
     // case RESTORE_IMAGES:
     HashMap<TableName, BackupManifest> backupManifestMap = new HashMap<>();
     // check and load backup image manifest for the tables
     Path rootPath = new Path(targetRootDir);
     HBackupFileSystem.checkImageManifestExist(backupManifestMap, sTableArray, conf, rootPath,
       backupId);
+
     restore(backupManifestMap, sTableArray, tTableArray, isOverwrite);
   }
 
-
-
 }


[3/4] hbase git commit: HBASE-14135 Merge backup images (Vladimir Rodionov)

Posted by el...@apache.org.
http://git-wip-us.apache.org/repos/asf/hbase/blob/35aa7aae/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java
deleted file mode 100644
index ba1b65e..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.backup.mapreduce;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValue.Type;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.HFileInputFormat;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
-import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
-import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-/**
- * A tool to split HFiles into new region boundaries as a MapReduce job. The tool generates HFiles
- * for later bulk importing.
- */
-@InterfaceAudience.Private
-public class HFileSplitterJob extends Configured implements Tool {
-  private static final Log LOG = LogFactory.getLog(HFileSplitterJob.class);
-  final static String NAME = "HFileSplitterJob";
-  public final static String BULK_OUTPUT_CONF_KEY = "hfile.bulk.output";
-  public final static String TABLES_KEY = "hfile.input.tables";
-  public final static String TABLE_MAP_KEY = "hfile.input.tablesmap";
-  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
-
-  public HFileSplitterJob() {
-  }
-
-  protected HFileSplitterJob(final Configuration c) {
-    super(c);
-  }
-
-  /**
-   * A mapper that just writes out cells. This one can be used together with
-   * {@link KeyValueSortReducer}
-   */
-  static class HFileCellMapper extends
-      Mapper<NullWritable, KeyValue, ImmutableBytesWritable, KeyValue> {
-
-    @Override
-    public void map(NullWritable key, KeyValue value, Context context) throws IOException,
-        InterruptedException {
-      // Convert value to KeyValue if subclass
-      if (!value.getClass().equals(KeyValue.class)) {
-        value =
-            new KeyValue(value.getRowArray(), value.getRowOffset(), value.getRowLength(),
-                value.getFamilyArray(), value.getFamilyOffset(), value.getFamilyLength(),
-                value.getQualifierArray(), value.getQualifierOffset(), value.getQualifierLength(),
-                value.getTimestamp(), Type.codeToType(value.getTypeByte()), value.getValueArray(),
-                value.getValueOffset(), value.getValueLength());
-      }
-      context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)), value);
-    }
-
-    @Override
-    public void setup(Context context) throws IOException {
-      // do nothing
-    }
-  }
-
-  /**
-   * Sets up the actual job.
-   * @param args The command line parameters.
-   * @return The newly created job.
-   * @throws IOException When setting up the job fails.
-   */
-  public Job createSubmittableJob(String[] args) throws IOException {
-    Configuration conf = getConf();
-    String inputDirs = args[0];
-    String tabName = args[1];
-    conf.setStrings(TABLES_KEY, tabName);
-    conf.set(FileInputFormat.INPUT_DIR, inputDirs);
-    Job job =
-        Job.getInstance(conf,
-          conf.get(JOB_NAME_CONF_KEY, NAME + "_" + EnvironmentEdgeManager.currentTime()));
-    job.setJarByClass(HFileSplitterJob.class);
-    job.setInputFormatClass(HFileInputFormat.class);
-    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
-    String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
-    if (hfileOutPath != null) {
-      LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);
-      TableName tableName = TableName.valueOf(tabName);
-      job.setMapperClass(HFileCellMapper.class);
-      job.setReducerClass(KeyValueSortReducer.class);
-      Path outputDir = new Path(hfileOutPath);
-      FileOutputFormat.setOutputPath(job, outputDir);
-      job.setMapOutputValueClass(KeyValue.class);
-      try (Connection conn = ConnectionFactory.createConnection(conf);
-          Table table = conn.getTable(tableName);
-          RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
-        HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
-      }
-      LOG.debug("success configuring load incremental job");
-
-      TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
-        org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.class);
-    } else {
-      throw new IOException("No bulk output directory specified");
-    }
-    return job;
-  }
-
-  /**
-   * Print usage
-   * @param errorMsg Error message. Can be null.
-   */
-  private void usage(final String errorMsg) {
-    if (errorMsg != null && errorMsg.length() > 0) {
-      System.err.println("ERROR: " + errorMsg);
-    }
-    System.err.println("Usage: " + NAME + " [options] <HFile inputdir(s)> <table>");
-    System.err.println("Read all HFile's for <table> and split them to <table> region boundaries.");
-    System.err.println("<table>  table to load.\n");
-    System.err.println("To generate HFiles for a bulk data load, pass the option:");
-    System.err.println("  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
-    System.err.println("Other options:");
-    System.err.println("   -D " + JOB_NAME_CONF_KEY
-        + "=jobName - use the specified mapreduce job name for the HFile splitter");
-    System.err.println("For performance also consider the following options:\n"
-        + "  -Dmapreduce.map.speculative=false\n" + "  -Dmapreduce.reduce.speculative=false");
-  }
-
-  /**
-   * Main entry point.
-   * @param args The command line parameters.
-   * @throws Exception When running the job fails.
-   */
-  public static void main(String[] args) throws Exception {
-    int ret = ToolRunner.run(new HFileSplitterJob(HBaseConfiguration.create()), args);
-    System.exit(ret);
-  }
-
-  @Override
-  public int run(String[] args) throws Exception {
-    if (args.length < 2) {
-      usage("Wrong number of arguments: " + args.length);
-      System.exit(-1);
-    }
-    Job job = createSubmittableJob(args);
-    int result = job.waitForCompletion(true) ? 0 : 1;
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/35aa7aae/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java
new file mode 100644
index 0000000..00c5b83
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java
@@ -0,0 +1,321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.mapreduce;
+
+import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupInfo;
+import org.apache.hadoop.hbase.backup.BackupMergeJob;
+import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
+import org.apache.hadoop.hbase.backup.HBackupFileSystem;
+import org.apache.hadoop.hbase.backup.impl.BackupManifest;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.util.Tool;
+
+/**
+ * MapReduce implementation of {@link BackupMergeJob}
+ * Must be initialized with configuration of a backup destination cluster
+ *
+ */
+
+@InterfaceAudience.Private
+public class MapReduceBackupMergeJob implements BackupMergeJob {
+  public static final Log LOG = LogFactory.getLog(MapReduceBackupMergeJob.class);
+
+  protected Tool player;
+  protected Configuration conf;
+
+  public MapReduceBackupMergeJob() {
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public void run(String[] backupIds) throws IOException {
+    String bulkOutputConfKey;
+
+    // TODO : run player on remote cluster
+    player = new MapReduceHFileSplitterJob();
+    bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY;
+    // Player reads all files in arbitrary directory structure and creates
+    // a Map task for each file
+    String bids = StringUtils.join(backupIds, ",");
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Merge backup images " + bids);
+    }
+
+    List<Pair<TableName, Path>> processedTableList = new ArrayList<Pair<TableName, Path>>();
+    boolean finishedTables = false;
+    Connection conn = ConnectionFactory.createConnection(getConf());
+    BackupSystemTable table = new BackupSystemTable(conn);
+    FileSystem fs = FileSystem.get(getConf());
+
+    try {
+
+      // Get exclusive lock on backup system
+      table.startBackupExclusiveOperation();
+      // Start merge operation
+      table.startMergeOperation(backupIds);
+
+      // Select most recent backup id
+      String mergedBackupId = findMostRecentBackupId(backupIds);
+
+      TableName[] tableNames = getTableNamesInBackupImages(backupIds);
+      String backupRoot = null;
+
+      BackupInfo bInfo = table.readBackupInfo(backupIds[0]);
+      backupRoot = bInfo.getBackupRootDir();
+
+      for (int i = 0; i < tableNames.length; i++) {
+
+        LOG.info("Merge backup images for " + tableNames[i]);
+
+        // Find input directories for table
+
+        Path[] dirPaths = findInputDirectories(fs, backupRoot, tableNames[i], backupIds);
+        String dirs = StringUtils.join(dirPaths, ",");
+        Path bulkOutputPath =
+            BackupUtils.getBulkOutputDir(BackupUtils.getFileNameCompatibleString(tableNames[i]),
+              getConf(), false);
+        // Delete content if exists
+        if (fs.exists(bulkOutputPath)) {
+          if (!fs.delete(bulkOutputPath, true)) {
+            LOG.warn("Can not delete: " + bulkOutputPath);
+          }
+        }
+        Configuration conf = getConf();
+        conf.set(bulkOutputConfKey, bulkOutputPath.toString());
+        String[] playerArgs = { dirs, tableNames[i].getNameAsString() };
+
+        int result = 0;
+
+        player.setConf(getConf());
+        result = player.run(playerArgs);
+        if (!succeeded(result)) {
+          throw new IOException("Can not merge backup images for " + dirs
+              + " (check Hadoop/MR and HBase logs). Player return code =" + result);
+        }
+        // Add to processed table list
+        processedTableList.add(new Pair<TableName, Path>(tableNames[i], bulkOutputPath));
+        LOG.debug("Merge Job finished:" + result);
+      }
+      List<TableName> tableList = toTableNameList(processedTableList);
+      table.updateProcessedTablesForMerge(tableList);
+      finishedTables = true;
+
+      // Move data
+      for (Pair<TableName, Path> tn : processedTableList) {
+        moveData(fs, backupRoot, tn.getSecond(), tn.getFirst(), mergedBackupId);
+      }
+
+      // Delete old data and update manifest
+      List<String> backupsToDelete = getBackupIdsToDelete(backupIds, mergedBackupId);
+      deleteBackupImages(backupsToDelete, conn, fs, backupRoot);
+      updateBackupManifest(backupRoot, mergedBackupId, backupsToDelete);
+      // Finish merge session
+      table.finishMergeOperation();
+      // Release lock
+      table.finishBackupExclusiveOperation();
+    } catch (RuntimeException e) {
+
+      throw e;
+    } catch (Exception e) {
+      LOG.error(e);
+      if (!finishedTables) {
+        // cleanup bulk directories and finish merge
+        // merge MUST be repeated (no need for repair)
+        cleanupBulkLoadDirs(fs, toPathList(processedTableList));
+        table.finishMergeOperation();
+        table.finishBackupExclusiveOperation();
+        throw new IOException("Backup merge operation failed, you should try it again", e);
+      } else {
+        // backup repair must be run
+        throw new IOException(
+            "Backup merge operation failed, run backup repair tool to restore system's integrity",
+            e);
+      }
+    } finally {
+      table.close();
+      conn.close();
+    }
+  }
+
+  protected List<Path> toPathList(List<Pair<TableName, Path>> processedTableList) {
+    ArrayList<Path> list = new ArrayList<Path>();
+    for (Pair<TableName, Path> p : processedTableList) {
+      list.add(p.getSecond());
+    }
+    return list;
+  }
+
+  protected List<TableName> toTableNameList(List<Pair<TableName, Path>> processedTableList) {
+    ArrayList<TableName> list = new ArrayList<TableName>();
+    for (Pair<TableName, Path> p : processedTableList) {
+      list.add(p.getFirst());
+    }
+    return list;
+  }
+
+  protected void cleanupBulkLoadDirs(FileSystem fs, List<Path> pathList) throws IOException {
+    for (Path path : pathList) {
+
+      if (!fs.delete(path, true)) {
+        LOG.warn("Can't delete " + path);
+      }
+    }
+  }
+
+  protected void updateBackupManifest(String backupRoot, String mergedBackupId,
+      List<String> backupsToDelete) throws IllegalArgumentException, IOException {
+
+    BackupManifest manifest =
+        HBackupFileSystem.getManifest(conf, new Path(backupRoot), mergedBackupId);
+    manifest.getBackupImage().removeAncestors(backupsToDelete);
+    // save back
+    manifest.store(conf);
+
+  }
+
+  protected void deleteBackupImages(List<String> backupIds, Connection conn, FileSystem fs,
+      String backupRoot) throws IOException {
+
+    // Delete from backup system table
+    try (BackupSystemTable table = new BackupSystemTable(conn);) {
+      for (String backupId : backupIds) {
+        table.deleteBackupInfo(backupId);
+      }
+    }
+
+    // Delete from file system
+    for (String backupId : backupIds) {
+      Path backupDirPath = HBackupFileSystem.getBackupPath(backupRoot, backupId);
+
+      if (!fs.delete(backupDirPath, true)) {
+        LOG.warn("Could not delete " + backupDirPath);
+      }
+    }
+  }
+
+  protected List<String> getBackupIdsToDelete(String[] backupIds, String mergedBackupId) {
+    List<String> list = new ArrayList<String>();
+    for (String id : backupIds) {
+      if (id.equals(mergedBackupId)) {
+        continue;
+      }
+      list.add(id);
+    }
+    return list;
+  }
+
+  protected void moveData(FileSystem fs, String backupRoot, Path bulkOutputPath, TableName tableName,
+      String mergedBackupId) throws IllegalArgumentException, IOException {
+
+    Path dest =
+        new Path(HBackupFileSystem.getTableBackupDataDir(backupRoot, mergedBackupId, tableName));
+
+    // Delete all in dest
+    if (!fs.delete(dest, true)) {
+      throw new IOException("Could not delete " + dest);
+    }
+
+    FileStatus[] fsts = fs.listStatus(bulkOutputPath);
+    for (FileStatus fst : fsts) {
+      if (fst.isDirectory()) {
+        fs.rename(fst.getPath().getParent(), dest);
+      }
+    }
+
+  }
+
+  protected String findMostRecentBackupId(String[] backupIds) {
+    long recentTimestamp = Long.MIN_VALUE;
+    for (String backupId : backupIds) {
+      long ts = Long.parseLong(backupId.split("_")[1]);
+      if (ts > recentTimestamp) {
+        recentTimestamp = ts;
+      }
+    }
+    return BackupRestoreConstants.BACKUPID_PREFIX + recentTimestamp;
+  }
+
+  protected TableName[] getTableNamesInBackupImages(String[] backupIds) throws IOException {
+
+    Set<TableName> allSet = new HashSet<TableName>();
+
+    try (Connection conn = ConnectionFactory.createConnection(conf);
+        BackupSystemTable table = new BackupSystemTable(conn);) {
+      for (String backupId : backupIds) {
+        BackupInfo bInfo = table.readBackupInfo(backupId);
+
+        allSet.addAll(bInfo.getTableNames());
+      }
+    }
+
+    TableName[] ret = new TableName[allSet.size()];
+    return allSet.toArray(ret);
+  }
+
+  protected Path[] findInputDirectories(FileSystem fs, String backupRoot, TableName tableName,
+      String[] backupIds) throws IOException {
+
+    List<Path> dirs = new ArrayList<Path>();
+
+    for (String backupId : backupIds) {
+      Path fileBackupDirPath =
+          new Path(HBackupFileSystem.getTableBackupDataDir(backupRoot, backupId, tableName));
+      if (fs.exists(fileBackupDirPath)) {
+        dirs.add(fileBackupDirPath);
+      } else {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("File: " + fileBackupDirPath + " does not exist.");
+        }
+      }
+    }
+    Path[] ret = new Path[dirs.size()];
+    return dirs.toArray(ret);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/35aa7aae/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java
new file mode 100644
index 0000000..49e8c75
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileInputFormat;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
+import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * A tool to split HFiles into new region boundaries as a MapReduce job. The tool generates HFiles
+ * for later bulk importing.
+ */
+@InterfaceAudience.Private
+public class MapReduceHFileSplitterJob extends Configured implements Tool {
+  private static final Log LOG = LogFactory.getLog(MapReduceHFileSplitterJob.class);
+  final static String NAME = "HFileSplitterJob";
+  public final static String BULK_OUTPUT_CONF_KEY = "hfile.bulk.output";
+  public final static String TABLES_KEY = "hfile.input.tables";
+  public final static String TABLE_MAP_KEY = "hfile.input.tablesmap";
+  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
+
+  public MapReduceHFileSplitterJob() {
+  }
+
+  protected MapReduceHFileSplitterJob(final Configuration c) {
+    super(c);
+  }
+
+  /**
+   * A mapper that just writes out cells. This one can be used together with
+   * {@link KeyValueSortReducer}
+   */
+  static class HFileCellMapper extends
+      Mapper<NullWritable, KeyValue, ImmutableBytesWritable, KeyValue> {
+
+    @Override
+    public void map(NullWritable key, KeyValue value, Context context) throws IOException,
+        InterruptedException {
+      // Convert value to KeyValue if subclass
+      if (!value.getClass().equals(KeyValue.class)) {
+        value =
+            new KeyValue(value.getRowArray(), value.getRowOffset(), value.getRowLength(),
+                value.getFamilyArray(), value.getFamilyOffset(), value.getFamilyLength(),
+                value.getQualifierArray(), value.getQualifierOffset(), value.getQualifierLength(),
+                value.getTimestamp(), Type.codeToType(value.getTypeByte()), value.getValueArray(),
+                value.getValueOffset(), value.getValueLength());
+      }
+      context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)), value);
+    }
+
+    @Override
+    public void setup(Context context) throws IOException {
+      // do nothing
+    }
+  }
+
+  /**
+   * Sets up the actual job.
+   * @param args The command line parameters.
+   * @return The newly created job.
+   * @throws IOException When setting up the job fails.
+   */
+  public Job createSubmittableJob(String[] args) throws IOException {
+    Configuration conf = getConf();
+    String inputDirs = args[0];
+    String tabName = args[1];
+    conf.setStrings(TABLES_KEY, tabName);
+    conf.set(FileInputFormat.INPUT_DIR, inputDirs);
+    Job job =
+        Job.getInstance(conf,
+          conf.get(JOB_NAME_CONF_KEY, NAME + "_" + EnvironmentEdgeManager.currentTime()));
+    job.setJarByClass(MapReduceHFileSplitterJob.class);
+    job.setInputFormatClass(HFileInputFormat.class);
+    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+    String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
+    if (hfileOutPath != null) {
+      LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);
+      TableName tableName = TableName.valueOf(tabName);
+      job.setMapperClass(HFileCellMapper.class);
+      job.setReducerClass(KeyValueSortReducer.class);
+      Path outputDir = new Path(hfileOutPath);
+      FileOutputFormat.setOutputPath(job, outputDir);
+      job.setMapOutputValueClass(KeyValue.class);
+      try (Connection conn = ConnectionFactory.createConnection(conf);
+          Table table = conn.getTable(tableName);
+          RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
+        HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
+      }
+      LOG.debug("success configuring load incremental job");
+
+      TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
+        org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.class);
+    } else {
+      throw new IOException("No bulk output directory specified");
+    }
+    return job;
+  }
+
+  /**
+   * Print usage
+   * @param errorMsg Error message. Can be null.
+   */
+  private void usage(final String errorMsg) {
+    if (errorMsg != null && errorMsg.length() > 0) {
+      System.err.println("ERROR: " + errorMsg);
+    }
+    System.err.println("Usage: " + NAME + " [options] <HFile inputdir(s)> <table>");
+    System.err.println("Read all HFile's for <table> and split them to <table> region boundaries.");
+    System.err.println("<table>  table to load.\n");
+    System.err.println("To generate HFiles for a bulk data load, pass the option:");
+    System.err.println("  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
+    System.err.println("Other options:");
+    System.err.println("   -D " + JOB_NAME_CONF_KEY
+        + "=jobName - use the specified mapreduce job name for the HFile splitter");
+    System.err.println("For performance also consider the following options:\n"
+        + "  -Dmapreduce.map.speculative=false\n" + "  -Dmapreduce.reduce.speculative=false");
+  }
+
+  /**
+   * Main entry point.
+   * @param args The command line parameters.
+   * @throws Exception When running the job fails.
+   */
+  public static void main(String[] args) throws Exception {
+    int ret = ToolRunner.run(new MapReduceHFileSplitterJob(HBaseConfiguration.create()), args);
+    System.exit(ret);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    if (args.length < 2) {
+      usage("Wrong number of arguments: " + args.length);
+      System.exit(-1);
+    }
+    Job job = createSubmittableJob(args);
+    int result = job.waitForCompletion(true) ? 0 : 1;
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/35aa7aae/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
index 4161ca9..1209e7c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
@@ -17,31 +17,31 @@
  */
 package org.apache.hadoop.hbase.backup.mapreduce;
 
+import static org.apache.hadoop.hbase.backup.util.BackupUtils.failed;
+import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded;
+
 import java.io.IOException;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
 import org.apache.hadoop.hbase.backup.RestoreJob;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
-import org.apache.hadoop.hbase.mapreduce.WALPlayer;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.util.Tool;
 
+
 /**
  * MapReduce implementation of {@link RestoreJob}
  *
- * For full backup restore, it runs {@link HFileSplitterJob} job and creates
+ * For backup restore, it runs {@link MapReduceHFileSplitterJob} job and creates
  * HFiles which are aligned with a region boundaries of a table being
- * restored, for incremental backup restore it runs {@link WALPlayer} in
- * bulk load mode (creates HFiles from WAL edits).
+ * restored.
  *
  * The resulting HFiles then are loaded using HBase bulk load tool
  * {@link LoadIncrementalHFiles}
@@ -62,8 +62,8 @@ public class MapReduceRestoreJob implements RestoreJob {
 
     String bulkOutputConfKey;
 
-    player = new HFileSplitterJob();
-    bulkOutputConfKey = HFileSplitterJob.BULK_OUTPUT_CONF_KEY;
+    player = new MapReduceHFileSplitterJob();
+    bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY;
     // Player reads all files in arbitrary directory structure and creates
     // a Map task for each file
     String dirs = StringUtils.join(dirPaths, ",");
@@ -71,8 +71,8 @@ public class MapReduceRestoreJob implements RestoreJob {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Restore " + (fullBackupRestore ? "full" : "incremental")
           + " backup from directory " + dirs + " from hbase tables "
-          + StringUtils.join(tableNames, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND) +
-          " to tables "
+          + StringUtils.join(tableNames, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND)
+          + " to tables "
           + StringUtils.join(newTableNames, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND));
     }
 
@@ -80,13 +80,16 @@ public class MapReduceRestoreJob implements RestoreJob {
 
       LOG.info("Restore " + tableNames[i] + " into " + newTableNames[i]);
 
-      Path bulkOutputPath = getBulkOutputDir(getFileNameCompatibleString(newTableNames[i]));
+      Path bulkOutputPath =
+          BackupUtils.getBulkOutputDir(BackupUtils.getFileNameCompatibleString(newTableNames[i]),
+            getConf());
       Configuration conf = getConf();
       conf.set(bulkOutputConfKey, bulkOutputPath.toString());
       String[] playerArgs =
-        { dirs,
-          fullBackupRestore? newTableNames[i].getNameAsString():tableNames[i].getNameAsString()
-        };
+          {
+              dirs,
+              fullBackupRestore ? newTableNames[i].getNameAsString() : tableNames[i]
+                  .getNameAsString() };
 
       int result = 0;
       int loaderResult = 0;
@@ -96,7 +99,7 @@ public class MapReduceRestoreJob implements RestoreJob {
         result = player.run(playerArgs);
         if (succeeded(result)) {
           // do bulk load
-          LoadIncrementalHFiles loader = createLoader(getConf());
+          LoadIncrementalHFiles loader = BackupUtils.createLoader(getConf());
           if (LOG.isDebugEnabled()) {
             LOG.debug("Restoring HFiles from directory " + bulkOutputPath);
           }
@@ -113,58 +116,11 @@ public class MapReduceRestoreJob implements RestoreJob {
         }
         LOG.debug("Restore Job finished:" + result);
       } catch (Exception e) {
+        LOG.error(e);
         throw new IOException("Can not restore from backup directory " + dirs
             + " (check Hadoop and HBase logs) ", e);
       }
-
-    }
-  }
-
-  private String getFileNameCompatibleString(TableName table) {
-    return table.getNamespaceAsString() + "-" + table.getQualifierAsString();
-  }
-
-  private boolean failed(int result) {
-    return result != 0;
-  }
-
-  private boolean succeeded(int result) {
-    return result == 0;
-  }
-
-  public static LoadIncrementalHFiles createLoader(Configuration config) throws IOException {
-    // set configuration for restore:
-    // LoadIncrementalHFile needs more time
-    // <name>hbase.rpc.timeout</name> <value>600000</value>
-    // calculates
-    Integer milliSecInHour = 3600000;
-    Configuration conf = new Configuration(config);
-    conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, milliSecInHour);
-
-    // By default, it is 32 and loader will fail if # of files in any region exceed this
-    // limit. Bad for snapshot restore.
-    conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE);
-    conf.set(LoadIncrementalHFiles.IGNORE_UNMATCHED_CF_CONF_KEY, "yes");
-    LoadIncrementalHFiles loader = null;
-    try {
-      loader = new LoadIncrementalHFiles(conf);
-    } catch (Exception e) {
-      throw new IOException(e);
     }
-    return loader;
-  }
-
-  private Path getBulkOutputDir(String tableName) throws IOException {
-    Configuration conf = getConf();
-    FileSystem fs = FileSystem.get(conf);
-    String tmp =
-        conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
-          HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
-    Path path =
-        new Path(tmp + Path.SEPARATOR + "bulk_output-" + tableName + "-"
-            + EnvironmentEdgeManager.currentTime());
-    fs.deleteOnExit(path);
-    return path;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/35aa7aae/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
index e32853d..ce77645 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
@@ -56,7 +56,9 @@ import org.apache.hadoop.hbase.backup.impl.BackupManifest;
 import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSTableDescriptors;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
@@ -68,14 +70,15 @@ import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 public final class BackupUtils {
   protected static final Log LOG = LogFactory.getLog(BackupUtils.class);
   public static final String LOGNAME_SEPARATOR = ".";
+  public static final int MILLISEC_IN_HOUR = 3600000;
 
   private BackupUtils() {
     throw new AssertionError("Instantiating utility class...");
   }
 
   /**
-   * Loop through the RS log timestamp map for the tables, for each RS, find the min timestamp
-   * value for the RS among the tables.
+   * Loop through the RS log timestamp map for the tables, for each RS, find the min timestamp value
+   * for the RS among the tables.
    * @param rsLogTimestampMap timestamp map
    * @return the min timestamp of each RS
    */
@@ -114,16 +117,17 @@ public final class BackupUtils {
   }
 
   /**
-   * copy out Table RegionInfo into incremental backup image need to consider move this
-   * logic into HBackupFileSystem
+   * copy out Table RegionInfo into incremental backup image need to consider move this logic into
+   * HBackupFileSystem
    * @param conn connection
    * @param backupInfo backup info
    * @param conf configuration
    * @throws IOException exception
    * @throws InterruptedException exception
    */
-  public static void copyTableRegionInfo(Connection conn, BackupInfo backupInfo,
-      Configuration conf) throws IOException, InterruptedException {
+  public static void
+      copyTableRegionInfo(Connection conn, BackupInfo backupInfo, Configuration conf)
+          throws IOException, InterruptedException {
     Path rootDir = FSUtils.getRootDir(conf);
     FileSystem fs = rootDir.getFileSystem(conf);
 
@@ -152,10 +156,8 @@ public final class BackupUtils {
       LOG.debug("Starting to write region info for table " + table);
       for (HRegionInfo regionInfo : regions) {
         Path regionDir =
-            HRegion.getRegionDir(new Path(backupInfo.getTableBackupDir(table)),
-              regionInfo);
-        regionDir =
-            new Path(backupInfo.getTableBackupDir(table), regionDir.getName());
+            HRegion.getRegionDir(new Path(backupInfo.getTableBackupDir(table)), regionInfo);
+        regionDir = new Path(backupInfo.getTableBackupDir(table), regionDir.getName());
         writeRegioninfoOnFilesystem(conf, targetFs, regionDir, regionInfo);
       }
       LOG.debug("Finished writing region info for table " + table);
@@ -301,7 +303,6 @@ public final class BackupUtils {
     return ret;
   }
 
-
   /**
    * Check whether the backup path exist
    * @param backupStr backup
@@ -431,8 +432,7 @@ public final class BackupUtils {
    * @param conf configuration
    * @throws IOException exception
    */
-  private static void cleanupHLogDir(BackupInfo backupInfo, Configuration conf)
-      throws IOException {
+  private static void cleanupHLogDir(BackupInfo backupInfo, Configuration conf) throws IOException {
 
     String logDir = backupInfo.getHLogTargetDir();
     if (logDir == null) {
@@ -452,7 +452,6 @@ public final class BackupUtils {
     }
   }
 
-
   private static void cleanupTargetDir(BackupInfo backupInfo, Configuration conf) {
     try {
       // clean up the data at target directory
@@ -498,8 +497,8 @@ public final class BackupUtils {
    * @param tableName table name
    * @return backupPath String for the particular table
    */
-  public static String getTableBackupDir(String backupRootDir, String backupId,
-      TableName tableName) {
+  public static String
+      getTableBackupDir(String backupRootDir, String backupId, TableName tableName) {
     return backupRootDir + Path.SEPARATOR + backupId + Path.SEPARATOR
         + tableName.getNamespaceAsString() + Path.SEPARATOR + tableName.getQualifierAsString()
         + Path.SEPARATOR;
@@ -523,7 +522,6 @@ public final class BackupUtils {
     return list;
   }
 
-
   /**
    * Calls fs.listStatus() and treats FileNotFoundException as non-fatal This accommodates
    * differences between hadoop versions, where hadoop 1 does not throw a FileNotFoundException, and
@@ -655,19 +653,16 @@ public final class BackupUtils {
    * @param backupId backup id
    * @param check check only
    * @param fromTables table list from
-   * @param toTables   table list to
+   * @param toTables table list to
    * @param isOverwrite overwrite data
    * @return request obkect
    */
   public static RestoreRequest createRestoreRequest(String backupRootDir, String backupId,
       boolean check, TableName[] fromTables, TableName[] toTables, boolean isOverwrite) {
     RestoreRequest.Builder builder = new RestoreRequest.Builder();
-    RestoreRequest request = builder.withBackupRootDir(backupRootDir)
-                                    .withBackupId(backupId)
-                                    .withCheck(check)
-                                    .withFromTables(fromTables)
-                                    .withToTables(toTables)
-                                    .withOvewrite(isOverwrite).build();
+    RestoreRequest request =
+        builder.withBackupRootDir(backupRootDir).withBackupId(backupId).withCheck(check)
+            .withFromTables(fromTables).withToTables(toTables).withOvewrite(isOverwrite).build();
     return request;
   }
 
@@ -699,4 +694,54 @@ public final class BackupUtils {
     return isValid;
   }
 
+  public static Path getBulkOutputDir(String tableName, Configuration conf, boolean deleteOnExit)
+      throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    String tmp =
+        conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
+    Path path =
+        new Path(tmp + Path.SEPARATOR + "bulk_output-" + tableName + "-"
+            + EnvironmentEdgeManager.currentTime());
+    if (deleteOnExit) {
+      fs.deleteOnExit(path);
+    }
+    return path;
+  }
+
+  public static Path getBulkOutputDir(String tableName, Configuration conf) throws IOException {
+    return getBulkOutputDir(tableName, conf, true);
+  }
+
+  public static String getFileNameCompatibleString(TableName table) {
+    return table.getNamespaceAsString() + "-" + table.getQualifierAsString();
+  }
+
+  public static boolean failed(int result) {
+    return result != 0;
+  }
+
+  public static boolean succeeded(int result) {
+    return result == 0;
+  }
+
+  public static LoadIncrementalHFiles createLoader(Configuration config) throws IOException {
+    // set configuration for restore:
+    // LoadIncrementalHFile needs more time
+    // <name>hbase.rpc.timeout</name> <value>600000</value>
+    // calculates
+    Configuration conf = new Configuration(config);
+    conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, MILLISEC_IN_HOUR);
+
+    // By default, it is 32 and loader will fail if # of files in any region exceed this
+    // limit. Bad for snapshot restore.
+    conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE);
+    conf.set(LoadIncrementalHFiles.IGNORE_UNMATCHED_CF_CONF_KEY, "yes");
+    LoadIncrementalHFiles loader = null;
+    try {
+      loader = new LoadIncrementalHFiles(conf);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+    return loader;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/35aa7aae/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java
new file mode 100644
index 0000000..7011ed3
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup;
+
+import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupMergeJob;
+import org.apache.hadoop.hbase.backup.mapreduce.MapReduceHFileSplitterJob;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Lists;
+
+@Category(LargeTests.class)
+public class TestIncrementalBackupMergeWithFailures extends TestBackupBase {
+  private static final Log LOG = LogFactory.getLog(TestIncrementalBackupMergeWithFailures.class);
+
+  static enum FailurePhase {
+    PHASE1, PHASE2, PHASE3, PHASE4
+  }
+  public final static String FAILURE_PHASE_KEY = "failurePhase";
+
+  static class BackupMergeJobWithFailures extends MapReduceBackupMergeJob {
+
+    FailurePhase failurePhase;
+
+    @Override
+    public void setConf(Configuration conf) {
+      super.setConf(conf);
+      String val = conf.get(FAILURE_PHASE_KEY);
+      if (val != null) {
+        failurePhase = FailurePhase.valueOf(val);
+      } else {
+        Assert.fail("Failure phase is not set");
+      }
+    }
+
+
+    /**
+     * This is the exact copy of parent's run() with injections
+     * of different types of failures
+     */
+    @Override
+    public void run(String[] backupIds) throws IOException {
+      String bulkOutputConfKey;
+
+      // TODO : run player on remote cluster
+      player = new MapReduceHFileSplitterJob();
+      bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY;
+      // Player reads all files in arbitrary directory structure and creates
+      // a Map task for each file
+      String bids = StringUtils.join(backupIds, ",");
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Merge backup images " + bids);
+      }
+
+      List<Pair<TableName, Path>> processedTableList = new ArrayList<Pair<TableName, Path>>();
+      boolean finishedTables = false;
+      Connection conn = ConnectionFactory.createConnection(getConf());
+      BackupSystemTable table = new BackupSystemTable(conn);
+      FileSystem fs = FileSystem.get(getConf());
+
+      try {
+
+        // Start backup exclusive operation
+        table.startBackupExclusiveOperation();
+        // Start merge operation
+        table.startMergeOperation(backupIds);
+
+        // Select most recent backup id
+        String mergedBackupId = findMostRecentBackupId(backupIds);
+
+        TableName[] tableNames = getTableNamesInBackupImages(backupIds);
+        String backupRoot = null;
+
+        BackupInfo bInfo = table.readBackupInfo(backupIds[0]);
+        backupRoot = bInfo.getBackupRootDir();
+        // PHASE 1
+        checkFailure(FailurePhase.PHASE1);
+
+        for (int i = 0; i < tableNames.length; i++) {
+
+          LOG.info("Merge backup images for " + tableNames[i]);
+
+          // Find input directories for table
+
+          Path[] dirPaths = findInputDirectories(fs, backupRoot, tableNames[i], backupIds);
+          String dirs = StringUtils.join(dirPaths, ",");
+          Path bulkOutputPath =
+              BackupUtils.getBulkOutputDir(BackupUtils.getFileNameCompatibleString(tableNames[i]),
+                getConf(), false);
+          // Delete content if exists
+          if (fs.exists(bulkOutputPath)) {
+            if (!fs.delete(bulkOutputPath, true)) {
+              LOG.warn("Can not delete: " + bulkOutputPath);
+            }
+          }
+          Configuration conf = getConf();
+          conf.set(bulkOutputConfKey, bulkOutputPath.toString());
+          String[] playerArgs = { dirs, tableNames[i].getNameAsString() };
+
+          int result = 0;
+          // PHASE 2
+          checkFailure(FailurePhase.PHASE2);
+          player.setConf(getConf());
+          result = player.run(playerArgs);
+          if (succeeded(result)) {
+            // Add to processed table list
+            processedTableList.add(new Pair<TableName, Path>(tableNames[i], bulkOutputPath));
+          } else {
+            throw new IOException("Can not merge backup images for " + dirs
+                + " (check Hadoop/MR and HBase logs). Player return code =" + result);
+          }
+          LOG.debug("Merge Job finished:" + result);
+        }
+        List<TableName> tableList = toTableNameList(processedTableList);
+        // PHASE 3
+        checkFailure(FailurePhase.PHASE3);
+        table.updateProcessedTablesForMerge(tableList);
+        finishedTables = true;
+
+        // Move data
+        for (Pair<TableName, Path> tn : processedTableList) {
+          moveData(fs, backupRoot, tn.getSecond(), tn.getFirst(), mergedBackupId);
+        }
+        // PHASE 4
+        checkFailure(FailurePhase.PHASE4);
+        // Delete old data and update manifest
+        List<String> backupsToDelete = getBackupIdsToDelete(backupIds, mergedBackupId);
+        deleteBackupImages(backupsToDelete, conn, fs, backupRoot);
+        updateBackupManifest(backupRoot, mergedBackupId, backupsToDelete);
+        // Finish merge session
+        table.finishMergeOperation();
+      } catch (RuntimeException e) {
+        throw e;
+      } catch (Exception e) {
+        LOG.error(e);
+        if (!finishedTables) {
+          // cleanup bulk directories and finish merge
+          // merge MUST be repeated (no need for repair)
+          cleanupBulkLoadDirs(fs, toPathList(processedTableList));
+          table.finishMergeOperation();
+          table.finishBackupExclusiveOperation();
+          throw new IOException("Backup merge operation failed, you should try it again", e);
+        } else {
+          // backup repair must be run
+          throw new IOException(
+              "Backup merge operation failed, run backup repair tool to restore system's integrity",
+              e);
+        }
+      } finally {
+        table.close();
+        conn.close();
+      }
+
+    }
+
+    private void checkFailure(FailurePhase phase) throws IOException {
+      if ( failurePhase != null && failurePhase == phase) {
+        throw new IOException (phase.toString());
+      }
+    }
+
+  }
+
+
+  @Test
+  public void TestIncBackupMergeRestore() throws Exception {
+
+    int ADD_ROWS = 99;
+    // #1 - create full backup for all tables
+    LOG.info("create full backup image for all tables");
+
+    List<TableName> tables = Lists.newArrayList(table1, table2);
+    // Set custom Merge Job implementation
+    conf1.setClass(BackupRestoreFactory.HBASE_BACKUP_MERGE_IMPL_CLASS,
+      BackupMergeJobWithFailures.class, BackupMergeJob.class);
+
+    Connection conn = ConnectionFactory.createConnection(conf1);
+
+    HBaseAdmin admin = null;
+    admin = (HBaseAdmin) conn.getAdmin();
+    BackupAdminImpl client = new BackupAdminImpl(conn);
+
+    BackupRequest request = createBackupRequest(BackupType.FULL, tables, BACKUP_ROOT_DIR);
+    String backupIdFull = client.backupTables(request);
+
+    assertTrue(checkSucceeded(backupIdFull));
+
+    // #2 - insert some data to table1
+    HTable t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
+    LOG.debug("writing " + ADD_ROWS + " rows to " + table1);
+
+    Assert.assertEquals(TEST_UTIL.countRows(t1), NB_ROWS_IN_BATCH + ADD_ROWS);
+    t1.close();
+    LOG.debug("written " + ADD_ROWS + " rows to " + table1);
+
+    HTable t2 = insertIntoTable(conn, table2, famName, 1, ADD_ROWS);
+
+    Assert.assertEquals(TEST_UTIL.countRows(t2), NB_ROWS_IN_BATCH + ADD_ROWS);
+    t2.close();
+    LOG.debug("written " + ADD_ROWS + " rows to " + table2);
+
+    // #3 - incremental backup for multiple tables
+    tables = Lists.newArrayList(table1, table2);
+    request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR);
+    String backupIdIncMultiple = client.backupTables(request);
+
+    assertTrue(checkSucceeded(backupIdIncMultiple));
+
+    t1 = insertIntoTable(conn, table1, famName, 2, ADD_ROWS);
+    t1.close();
+
+    t2 = insertIntoTable(conn, table2, famName, 2, ADD_ROWS);
+    t2.close();
+
+    // #3 - incremental backup for multiple tables
+    request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR);
+    String backupIdIncMultiple2 = client.backupTables(request);
+    assertTrue(checkSucceeded(backupIdIncMultiple2));
+
+    // #4 Merge backup images with failures
+
+    for ( FailurePhase phase : FailurePhase.values()) {
+      Configuration conf = conn.getConfiguration();
+
+      conf.set(FAILURE_PHASE_KEY, phase.toString());
+
+      try (BackupAdmin bAdmin = new BackupAdminImpl(conn);)
+      {
+        String[] backups = new String[] { backupIdIncMultiple, backupIdIncMultiple2 };
+        bAdmin.mergeBackups(backups);
+        Assert.fail("Expected IOException");
+      } catch (IOException e) {
+        BackupSystemTable table = new BackupSystemTable(conn);
+        if(phase.ordinal() < FailurePhase.PHASE4.ordinal()) {
+          // No need to repair:
+          // Both Merge and backup exclusive operations are finished
+          assertFalse(table.isMergeInProgress());
+          try {
+            table.finishBackupExclusiveOperation();
+            Assert.fail("IOException is expected");
+          } catch(IOException ee) {
+            // Expected
+          }
+        } else {
+          // Repair is required
+          assertTrue(table.isMergeInProgress());
+          try {
+            table.startBackupExclusiveOperation();
+            Assert.fail("IOException is expected");
+          } catch(IOException ee) {
+            // Expected - clean up before proceeding
+            table.finishMergeOperation();
+            table.finishBackupExclusiveOperation();
+          }
+        }
+        table.close();
+        LOG.debug("Expected :"+ e.getMessage());
+      }
+    }
+
+    // Now merge w/o failures
+    Configuration conf = conn.getConfiguration();
+    conf.unset(FAILURE_PHASE_KEY);
+    conf.unset(BackupRestoreFactory.HBASE_BACKUP_MERGE_IMPL_CLASS);
+
+    try (BackupAdmin bAdmin = new BackupAdminImpl(conn);) {
+      String[] backups = new String[] { backupIdIncMultiple, backupIdIncMultiple2 };
+      bAdmin.mergeBackups(backups);
+    }
+
+    // #6 - restore incremental backup for multiple tables, with overwrite
+    TableName[] tablesRestoreIncMultiple = new TableName[] { table1, table2 };
+    TableName[] tablesMapIncMultiple = new TableName[] { table1_restore, table2_restore };
+    client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple2, false,
+      tablesRestoreIncMultiple, tablesMapIncMultiple, true));
+
+    Table hTable = conn.getTable(table1_restore);
+    LOG.debug("After incremental restore: " + hTable.getTableDescriptor());
+    LOG.debug("f1 has " + TEST_UTIL.countRows(hTable, famName) + " rows");
+    Assert.assertEquals(TEST_UTIL.countRows(hTable, famName), NB_ROWS_IN_BATCH + 2 * ADD_ROWS);
+
+    hTable.close();
+
+    hTable = conn.getTable(table2_restore);
+    Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH + 2 * ADD_ROWS);
+    hTable.close();
+
+    admin.close();
+    conn.close();
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/35aa7aae/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestRepairAfterFailedDelete.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestRepairAfterFailedDelete.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestRepairAfterFailedDelete.java
index 9c47641..556521f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestRepairAfterFailedDelete.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestRepairAfterFailedDelete.java
@@ -74,7 +74,7 @@ public class TestRepairAfterFailedDelete extends TestBackupBase {
     admin.restoreSnapshot(snapshotName);
     admin.enableTable(BackupSystemTable.getTableName(conf1));
     // Start backup session
-    table.startBackupSession();
+    table.startBackupExclusiveOperation();
     // Start delete operation
     table.startDeleteOperation(backupIds);