You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2018/03/10 17:38:29 UTC

hbase git commit: HBASE-19969: Improve fault tolerance in backup Merge operation

Repository: hbase
Updated Branches:
  refs/heads/master 19a396b9c -> d5aaeee88


HBASE-19969: Improve fault tolerance in backup Merge operation

Signed-off-by: tedyu <yu...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d5aaeee8
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d5aaeee8
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d5aaeee8

Branch: refs/heads/master
Commit: d5aaeee88b331e064830a2774f4fed238631457c
Parents: 19a396b
Author: Vladimir Rodionov <vr...@hortonworks.com>
Authored: Fri Mar 9 16:49:55 2018 -0800
Committer: tedyu <yu...@gmail.com>
Committed: Sat Mar 10 09:37:55 2018 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/backup/HBackupFileSystem.java  |  19 +++
 .../hbase/backup/impl/BackupAdminImpl.java      |   1 +
 .../hbase/backup/impl/BackupCommands.java       |  66 ++++++--
 .../mapreduce/MapReduceBackupMergeJob.java      | 161 ++++++++++++-------
 .../hadoop/hbase/backup/util/BackupUtils.java   |  12 ++
 .../hadoop/hbase/backup/TestBackupMerge.java    | 132 +++++++++++++++
 .../TestIncrementalBackupMergeWithFailures.java |  40 ++++-
 7 files changed, 351 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/d5aaeee8/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java
index ff65a64..e097554 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java
@@ -63,6 +63,25 @@ public final class HBackupFileSystem {
         + Path.SEPARATOR;
   }
 
+  /**
+   * Get backup temporary directory
+   * @param backupRootDir backup root
+   * @return backup tmp directory path
+   */
+  public static Path getBackupTmpDirPath(String backupRootDir) {
+    return new Path(backupRootDir, ".tmp");
+  }
+
+  /**
+   * Get backup tmp directory for backupId
+   * @param backupRoot backup root
+   * @param backupId backup id
+   * @return backup tmp directory path
+   */
+  public static Path getBackupTmpDirPathForBackupId(String backupRoot, String backupId) {
+    return new Path(getBackupTmpDirPath(backupRoot), backupId);
+  }
+
   public static String getTableBackupDataDir(String backupRootDir, String backupId,
       TableName tableName) {
     return getTableBackupDir(backupRootDir, backupId, tableName) + Path.SEPARATOR + "data";

http://git-wip-us.apache.org/repos/asf/hbase/blob/d5aaeee8/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
index 776cc1b..0d20f37 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
@@ -619,6 +619,7 @@ public class BackupAdminImpl implements BackupAdmin {
   public void mergeBackups(String[] backupIds) throws IOException {
     try (final BackupSystemTable sysTable = new BackupSystemTable(conn)) {
       checkIfValidForMerge(backupIds, sysTable);
+      //TODO run job on remote cluster
       BackupMergeJob job = BackupRestoreFactory.getBackupMergeJob(conn.getConfiguration());
       job.run(backupIds);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d5aaeee8/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
index c310178..3c43bb6 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
@@ -58,13 +58,13 @@ import org.apache.hadoop.hbase.backup.BackupRequest;
 import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
 import org.apache.hadoop.hbase.backup.BackupRestoreConstants.BackupCommand;
 import org.apache.hadoop.hbase.backup.BackupType;
+import org.apache.hadoop.hbase.backup.HBackupFileSystem;
 import org.apache.hadoop.hbase.backup.util.BackupSet;
 import org.apache.hadoop.hbase.backup.util.BackupUtils;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.yetus.audience.InterfaceAudience;
-
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 
 /**
@@ -403,7 +403,7 @@ public final class BackupCommands {
     }
   }
 
-  private static class HelpCommand extends Command {
+  public static class HelpCommand extends Command {
     HelpCommand(Configuration conf, CommandLine cmdline) {
       super(conf);
       this.cmdline = cmdline;
@@ -454,7 +454,7 @@ public final class BackupCommands {
     }
   }
 
-  private static class DescribeCommand extends Command {
+  public static class DescribeCommand extends Command {
     DescribeCommand(Configuration conf, CommandLine cmdline) {
       super(conf);
       this.cmdline = cmdline;
@@ -492,7 +492,7 @@ public final class BackupCommands {
     }
   }
 
-  private static class ProgressCommand extends Command {
+  public static class ProgressCommand extends Command {
     ProgressCommand(Configuration conf, CommandLine cmdline) {
       super(conf);
       this.cmdline = cmdline;
@@ -547,7 +547,7 @@ public final class BackupCommands {
     }
   }
 
-  private static class DeleteCommand extends Command {
+  public static class DeleteCommand extends Command {
     DeleteCommand(Configuration conf, CommandLine cmdline) {
       super(conf);
       this.cmdline = cmdline;
@@ -586,7 +586,7 @@ public final class BackupCommands {
     }
   }
 
-  private static class RepairCommand extends Command {
+  public static class RepairCommand extends Command {
     RepairCommand(Configuration conf, CommandLine cmdline) {
       super(conf);
       this.cmdline = cmdline;
@@ -661,8 +661,9 @@ public final class BackupCommands {
       System.out.println("DELETE operation finished OK: " + StringUtils.join(backupIds));
     }
 
-    private void repairFailedBackupMergeIfAny(Connection conn, BackupSystemTable sysTable)
+    public static void repairFailedBackupMergeIfAny(Connection conn, BackupSystemTable sysTable)
         throws IOException {
+
       String[] backupIds = sysTable.getListOfBackupIdsFromMergeOperation();
       if (backupIds == null || backupIds.length == 0) {
         System.out.println("No failed backup MERGE operation found");
@@ -671,17 +672,52 @@ public final class BackupCommands {
         return;
       }
       System.out.println("Found failed MERGE operation for: " + StringUtils.join(backupIds));
-      System.out.println("Running MERGE again ...");
+      // Check if backup .tmp exists
+      BackupInfo bInfo = sysTable.readBackupInfo(backupIds[0]);
+      String backupRoot = bInfo.getBackupRootDir();
+      FileSystem fs = FileSystem.get(new Path(backupRoot).toUri(), new Configuration());
+      String backupId = BackupUtils.findMostRecentBackupId(backupIds);
+      Path tmpPath = HBackupFileSystem.getBackupTmpDirPathForBackupId(backupRoot, backupId);
+      if (fs.exists(tmpPath)) {
+        // Move data back
+        Path destPath = HBackupFileSystem.getBackupPath(backupRoot, backupId);
+        if (!fs.delete(destPath, true)) {
+          System.out.println("Failed to delete " + destPath);
+        }
+        boolean res = fs.rename(tmpPath, destPath);
+        if (!res) {
+          throw new IOException("MERGE repair: failed  to rename from "+ tmpPath+" to "+ destPath);
+        }
+        System.out.println("MERGE repair: renamed from "+ tmpPath+" to "+ destPath+" res="+ res);
+      } else {
+        checkRemoveBackupImages(fs, backupRoot, backupIds);
+      }
       // Restore table from snapshot
       BackupSystemTable.restoreFromSnapshot(conn);
-      // Unlock backupo system
+      // Unlock backup system
       sysTable.finishBackupExclusiveOperation();
       // Finish previous failed session
       sysTable.finishMergeOperation();
-      try (BackupAdmin admin = new BackupAdminImpl(conn)) {
-        admin.mergeBackups(backupIds);
+
+      System.out.println("MERGE repair operation finished OK: " + StringUtils.join(backupIds));
+    }
+
+    private static void checkRemoveBackupImages(FileSystem fs, String backupRoot,
+      String[] backupIds) throws IOException {
+      String mergedBackupId = BackupUtils.findMostRecentBackupId(backupIds);
+      for (String backupId: backupIds) {
+        if (backupId.equals(mergedBackupId)) {
+          continue;
+        }
+        Path path = HBackupFileSystem.getBackupPath(backupRoot, backupId);
+        if (fs.exists(path)) {
+          if (!fs.delete(path, true)) {
+            System.out.println("MERGE repair removing: "+ path +" - FAILED");
+          } else {
+            System.out.println("MERGE repair removing: "+ path +" - OK");
+          }
+        }
       }
-      System.out.println("MERGE operation finished OK: " + StringUtils.join(backupIds));
     }
 
     @Override
@@ -690,7 +726,7 @@ public final class BackupCommands {
     }
   }
 
-  private static class MergeCommand extends Command {
+  public static class MergeCommand extends Command {
     MergeCommand(Configuration conf, CommandLine cmdline) {
       super(conf);
       this.cmdline = cmdline;
@@ -739,7 +775,7 @@ public final class BackupCommands {
     }
   }
 
-  private static class HistoryCommand extends Command {
+  public static class HistoryCommand extends Command {
     private final static int DEFAULT_HISTORY_LENGTH = 10;
 
     HistoryCommand(Configuration conf, CommandLine cmdline) {
@@ -862,7 +898,7 @@ public final class BackupCommands {
     }
   }
 
-  private static class BackupSetCommand extends Command {
+  public static class BackupSetCommand extends Command {
     private final static String SET_ADD_CMD = "add";
     private final static String SET_REMOVE_CMD = "remove";
     private final static String SET_DELETE_CMD = "delete";

http://git-wip-us.apache.org/repos/asf/hbase/blob/d5aaeee8/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java
index 3fcf692..f71b62a 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java
@@ -19,12 +19,13 @@ package org.apache.hadoop.hbase.backup.mapreduce;
 
 import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded;
 
-import java.io.FileNotFoundException;
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.Stack;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -36,7 +37,6 @@ import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.backup.BackupInfo;
 import org.apache.hadoop.hbase.backup.BackupMergeJob;
-import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
 import org.apache.hadoop.hbase.backup.HBackupFileSystem;
 import org.apache.hadoop.hbase.backup.impl.BackupManifest;
 import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
@@ -105,7 +105,7 @@ public class MapReduceBackupMergeJob implements BackupMergeJob {
       table.startMergeOperation(backupIds);
 
       // Select most recent backup id
-      String mergedBackupId = findMostRecentBackupId(backupIds);
+      String mergedBackupId = BackupUtils.findMostRecentBackupId(backupIds);
 
       TableName[] tableNames = getTableNamesInBackupImages(backupIds);
 
@@ -146,15 +146,34 @@ public class MapReduceBackupMergeJob implements BackupMergeJob {
       table.updateProcessedTablesForMerge(tableList);
       finishedTables = true;
 
-      // Move data
+      // PHASE 2 (modification of a backup file system)
+      // Move existing mergedBackupId data into tmp directory
+      // we will need it later in case of a failure
+      Path tmpBackupDir = HBackupFileSystem.getBackupTmpDirPathForBackupId(backupRoot,
+        mergedBackupId);
+      Path backupDirPath = HBackupFileSystem.getBackupPath(backupRoot, mergedBackupId);
+
+      if (!fs.rename(backupDirPath, tmpBackupDir)) {
+        throw new IOException("Failed to rename "+ backupDirPath +" to "+tmpBackupDir);
+      } else {
+        LOG.debug("Renamed "+ backupDirPath +" to "+ tmpBackupDir);
+      }
+      // Move new data into backup dest
       for (Pair<TableName, Path> tn : processedTableList) {
         moveData(fs, backupRoot, tn.getSecond(), tn.getFirst(), mergedBackupId);
       }
-
-      // Delete old data and update manifest
+      // Update backup manifest
       List<String> backupsToDelete = getBackupIdsToDelete(backupIds, mergedBackupId);
+      updateBackupManifest(tmpBackupDir.getParent().toString(), mergedBackupId, backupsToDelete);
+      // Copy meta files back from tmp to backup dir
+      copyMetaData(fs, tmpBackupDir, backupDirPath);
+      // Delete tmp dir (Rename back during repair)
+      if (!fs.delete(tmpBackupDir, true)) {
+        // WARN and ignore
+        LOG.warn("Could not delete tmp dir: "+ tmpBackupDir);
+      }
+      // Delete old data
       deleteBackupImages(backupsToDelete, conn, fs, backupRoot);
-      updateBackupManifest(backupRoot, mergedBackupId, backupsToDelete);
       // Finish merge session
       table.finishMergeOperation();
       // Release lock
@@ -183,6 +202,80 @@ public class MapReduceBackupMergeJob implements BackupMergeJob {
     }
   }
 
+  /**
+   * Copy meta data to of a backup session
+   * @param fs file system
+   * @param tmpBackupDir temp backup directory, where meta is locaed
+   * @param backupDirPath new path for backup
+   * @throws IOException exception
+   */
+  protected void copyMetaData(FileSystem fs, Path tmpBackupDir, Path backupDirPath)
+      throws IOException {
+    RemoteIterator<LocatedFileStatus> it = fs.listFiles(tmpBackupDir, true);
+    List<Path> toKeep = new ArrayList<Path>();
+    while (it.hasNext()) {
+      Path p = it.next().getPath();
+      if (fs.isDirectory(p)) {
+        continue;
+      }
+      // Keep meta
+      String fileName = p.toString();
+      if (fileName.indexOf(FSTableDescriptors.TABLEINFO_DIR) > 0
+          || fileName.indexOf(HRegionFileSystem.REGION_INFO_FILE) > 0) {
+        toKeep.add(p);
+      }
+    }
+    // Copy meta to destination
+    for (Path p : toKeep) {
+      Path newPath = convertToDest(p, backupDirPath);
+      copyFile(fs, p, newPath);
+    }
+  }
+
+  /**
+   * Copy file in DFS from p to newPath
+   * @param fs file system
+   * @param p old path
+   * @param newPath new path
+   * @throws IOException exception
+   */
+  protected void copyFile(FileSystem fs, Path p, Path newPath) throws IOException {
+    File f = File.createTempFile("data", "meta");
+    Path localPath = new Path(f.getAbsolutePath());
+    fs.copyToLocalFile(p, localPath);
+    fs.copyFromLocalFile(localPath, newPath);
+    boolean exists = fs.exists(newPath);
+    if (!exists) {
+      throw new IOException("Failed to copy meta file to: "+ newPath);
+    }
+  }
+
+/**
+ * Converts path before copying
+ * @param p path
+ * @param backupDirPath backup root
+ * @return converted path
+ */
+  protected Path convertToDest(Path p, Path backupDirPath) {
+    String backupId = backupDirPath.getName();
+    Stack<String> stack = new Stack<String>();
+    String name = null;
+    while (true) {
+      name = p.getName();
+      if (!name.equals(backupId)) {
+        stack.push(name);
+        p = p.getParent();
+      } else {
+        break;
+      }
+    }
+    Path newPath = new Path(backupDirPath.toString());
+    while (!stack.isEmpty()) {
+      newPath = new Path(newPath, stack.pop());
+    }
+    return newPath;
+  }
+
   protected List<Path> toPathList(List<Pair<TableName, Path>> processedTableList) {
     ArrayList<Path> list = new ArrayList<>();
     for (Pair<TableName, Path> p : processedTableList) {
@@ -251,11 +344,6 @@ public class MapReduceBackupMergeJob implements BackupMergeJob {
     Path dest =
         new Path(HBackupFileSystem.getTableBackupDir(backupRoot, mergedBackupId, tableName));
 
-    // Delete all *data* files in dest
-    if (!deleteData(fs, dest)) {
-      throw new IOException("Could not delete " + dest);
-    }
-
     FileStatus[] fsts = fs.listStatus(bulkOutputPath);
     for (FileStatus fst : fsts) {
       if (fst.isDirectory()) {
@@ -265,54 +353,13 @@ public class MapReduceBackupMergeJob implements BackupMergeJob {
           if (!fs.delete(newDst, true)) {
             throw new IOException("failed to delete :"+ newDst);
           }
+        } else {
+          fs.mkdirs(dest);
         }
-        fs.rename(fst.getPath(), dest);
-      }
-    }
-  }
-
-  /**
-   * Deletes only data files and keeps all META
-   * @param fs file system instance
-   * @param dest destination location
-   * @return true, if success, false - otherwise
-   * @throws FileNotFoundException exception
-   * @throws IOException exception
-   */
-  private boolean deleteData(FileSystem fs, Path dest) throws FileNotFoundException, IOException {
-    RemoteIterator<LocatedFileStatus> it = fs.listFiles(dest, true);
-    List<Path> toDelete = new ArrayList<Path>();
-    while (it.hasNext()) {
-      Path p = it.next().getPath();
-      if (fs.isDirectory(p)) {
-        continue;
-      }
-      // Keep meta
-      String fileName  = p.toString();
-      if (fileName.indexOf(FSTableDescriptors.TABLEINFO_DIR) > 0 ||
-          fileName.indexOf(HRegionFileSystem.REGION_INFO_FILE) > 0) {
-        continue;
-      }
-      toDelete.add(p);
-    }
-    for (Path p : toDelete) {
-      boolean result = fs.delete(p, false);
-      if (!result) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  protected String findMostRecentBackupId(String[] backupIds) {
-    long recentTimestamp = Long.MIN_VALUE;
-    for (String backupId : backupIds) {
-      long ts = Long.parseLong(backupId.split("_")[1]);
-      if (ts > recentTimestamp) {
-        recentTimestamp = ts;
+        boolean result = fs.rename(fst.getPath(), dest);
+        LOG.debug("MoveData from "+ fst.getPath() +" to "+ dest+" result="+ result);
       }
     }
-    return BackupRestoreConstants.BACKUPID_PREFIX + recentTimestamp;
   }
 
   protected TableName[] getTableNamesInBackupImages(String[] backupIds) throws IOException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/d5aaeee8/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
index 18548f5..96ecab9 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
@@ -740,4 +740,16 @@ public final class BackupUtils {
     }
     return loader;
   }
+
+  public static String findMostRecentBackupId(String[] backupIds) {
+    long recentTimestamp = Long.MIN_VALUE;
+    for (String backupId : backupIds) {
+      long ts = Long.parseLong(backupId.split("_")[1]);
+      if (ts > recentTimestamp) {
+        recentTimestamp = ts;
+      }
+    }
+    return BackupRestoreConstants.BACKUPID_PREFIX + recentTimestamp;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d5aaeee8/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupMerge.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupMerge.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupMerge.java
new file mode 100644
index 0000000..8ead548
--- /dev/null
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupMerge.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category(LargeTests.class)
+public class TestBackupMerge extends TestBackupBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestBackupMerge.class);
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestBackupMerge.class);
+
+
+
+  @Test
+  public void TestIncBackupMergeRestore() throws Exception {
+    int ADD_ROWS = 99;
+    // #1 - create full backup for all tables
+    LOG.info("create full backup image for all tables");
+
+    List<TableName> tables = Lists.newArrayList(table1, table2);
+    // Set custom Merge Job implementation
+
+
+    Connection conn = ConnectionFactory.createConnection(conf1);
+
+    HBaseAdmin admin = (HBaseAdmin) conn.getAdmin();
+    BackupAdminImpl client = new BackupAdminImpl(conn);
+
+    BackupRequest request = createBackupRequest(BackupType.FULL, tables, BACKUP_ROOT_DIR);
+    String backupIdFull = client.backupTables(request);
+
+    assertTrue(checkSucceeded(backupIdFull));
+
+    // #2 - insert some data to table1
+    HTable t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
+    LOG.debug("writing " + ADD_ROWS + " rows to " + table1);
+
+    Assert.assertEquals(TEST_UTIL.countRows(t1), NB_ROWS_IN_BATCH + ADD_ROWS);
+    t1.close();
+    LOG.debug("written " + ADD_ROWS + " rows to " + table1);
+
+    HTable t2 = insertIntoTable(conn, table2, famName, 1, ADD_ROWS);
+
+    Assert.assertEquals(TEST_UTIL.countRows(t2), NB_ROWS_IN_BATCH + ADD_ROWS);
+    t2.close();
+    LOG.debug("written " + ADD_ROWS + " rows to " + table2);
+
+    // #3 - incremental backup for multiple tables
+    tables = Lists.newArrayList(table1, table2);
+    request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR);
+    String backupIdIncMultiple = client.backupTables(request);
+
+    assertTrue(checkSucceeded(backupIdIncMultiple));
+
+    t1 = insertIntoTable(conn, table1, famName, 2, ADD_ROWS);
+    t1.close();
+
+    t2 = insertIntoTable(conn, table2, famName, 2, ADD_ROWS);
+    t2.close();
+
+    // #3 - incremental backup for multiple tables
+    request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR);
+    String backupIdIncMultiple2 = client.backupTables(request);
+    assertTrue(checkSucceeded(backupIdIncMultiple2));
+
+    try (BackupAdmin bAdmin = new BackupAdminImpl(conn)) {
+      String[] backups = new String[] { backupIdIncMultiple, backupIdIncMultiple2 };
+      bAdmin.mergeBackups(backups);
+    }
+
+    // #6 - restore incremental backup for multiple tables, with overwrite
+    TableName[] tablesRestoreIncMultiple = new TableName[] { table1, table2 };
+    TableName[] tablesMapIncMultiple = new TableName[] { table1_restore, table2_restore };
+    client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple2, false,
+      tablesRestoreIncMultiple, tablesMapIncMultiple, true));
+
+    Table hTable = conn.getTable(table1_restore);
+    LOG.debug("After incremental restore: " + hTable.getTableDescriptor());
+    int countRows = TEST_UTIL.countRows(hTable, famName);
+    LOG.debug("f1 has " + countRows + " rows");
+    Assert.assertEquals(NB_ROWS_IN_BATCH + 2 * ADD_ROWS, countRows);
+
+    hTable.close();
+
+    hTable = conn.getTable(table2_restore);
+    Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH + 2 * ADD_ROWS);
+    hTable.close();
+
+    admin.close();
+    conn.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/d5aaeee8/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java
index 7ce5050..57bdc46 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
+import org.apache.hadoop.hbase.backup.impl.BackupCommands;
 import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
 import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupMergeJob;
 import org.apache.hadoop.hbase.backup.mapreduce.MapReduceHFileSplitterJob;
@@ -113,7 +114,7 @@ public class TestIncrementalBackupMergeWithFailures extends TestBackupBase {
         table.startMergeOperation(backupIds);
 
         // Select most recent backup id
-        String mergedBackupId = findMostRecentBackupId(backupIds);
+        String mergedBackupId = BackupUtils.findMostRecentBackupId(backupIds);
 
         TableName[] tableNames = getTableNamesInBackupImages(backupIds);
 
@@ -160,18 +161,38 @@ public class TestIncrementalBackupMergeWithFailures extends TestBackupBase {
         table.updateProcessedTablesForMerge(tableList);
         finishedTables = true;
 
-        // Move data
+        // (modification of a backup file system)
+        // Move existing mergedBackupId data into tmp directory
+        // we will need it later in case of a failure
+        Path tmpBackupDir =  HBackupFileSystem.getBackupTmpDirPathForBackupId(backupRoot,
+          mergedBackupId);
+        Path backupDirPath = HBackupFileSystem.getBackupPath(backupRoot, mergedBackupId);
+        if (!fs.rename(backupDirPath, tmpBackupDir)) {
+          throw new IOException("Failed to rename "+ backupDirPath +" to "+tmpBackupDir);
+        } else {
+          LOG.debug("Renamed "+ backupDirPath +" to "+ tmpBackupDir);
+        }
+        // Move new data into backup dest
         for (Pair<TableName, Path> tn : processedTableList) {
           moveData(fs, backupRoot, tn.getSecond(), tn.getFirst(), mergedBackupId);
         }
-        // PHASE 4
         checkFailure(FailurePhase.PHASE4);
-        // Delete old data and update manifest
+        // Update backup manifest
         List<String> backupsToDelete = getBackupIdsToDelete(backupIds, mergedBackupId);
+        updateBackupManifest(tmpBackupDir.getParent().toString(), mergedBackupId, backupsToDelete);
+        // Copy meta files back from tmp to backup dir
+        copyMetaData(fs, tmpBackupDir, backupDirPath);
+        // Delete tmp dir (Rename back during repair)
+        if (!fs.delete(tmpBackupDir, true)) {
+          // WARN and ignore
+          LOG.warn("Could not delete tmp dir: "+ tmpBackupDir);
+        }
+        // Delete old data
         deleteBackupImages(backupsToDelete, conn, fs, backupRoot);
-        updateBackupManifest(backupRoot, mergedBackupId, backupsToDelete);
         // Finish merge session
         table.finishMergeOperation();
+        // Release lock
+        table.finishBackupExclusiveOperation();
       } catch (RuntimeException e) {
         throw e;
       } catch (Exception e) {
@@ -285,8 +306,8 @@ public class TestIncrementalBackupMergeWithFailures extends TestBackupBase {
             Assert.fail("IOException is expected");
           } catch(IOException ee) {
             // Expected - clean up before proceeding
-            table.finishMergeOperation();
-            table.finishBackupExclusiveOperation();
+            //table.finishMergeOperation();
+            //table.finishBackupExclusiveOperation();
           }
         }
         table.close();
@@ -297,7 +318,10 @@ public class TestIncrementalBackupMergeWithFailures extends TestBackupBase {
     Configuration conf = conn.getConfiguration();
     conf.unset(FAILURE_PHASE_KEY);
     conf.unset(BackupRestoreFactory.HBASE_BACKUP_MERGE_IMPL_CLASS);
-
+    // Now run repair
+    BackupSystemTable sysTable = new BackupSystemTable(conn);
+    BackupCommands.RepairCommand.repairFailedBackupMergeIfAny(conn, sysTable);
+    // Now repeat merge
     try (BackupAdmin bAdmin = new BackupAdminImpl(conn)) {
       String[] backups = new String[] { backupIdIncMultiple, backupIdIncMultiple2 };
       bAdmin.mergeBackups(backups);