You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by el...@apache.org on 2018/01/12 19:07:19 UTC

hbase git commit: HBASE-19568: Restore of HBase table using incremental backup doesn't restore rows from an earlier incremental backup

Repository: hbase
Updated Branches:
  refs/heads/master 057e80c16 -> a5601c8ea


HBASE-19568: Restore of HBase table using incremental backup doesn't restore rows from an earlier incremental backup

Signed-off-by: Josh Elser <el...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a5601c8e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a5601c8e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a5601c8e

Branch: refs/heads/master
Commit: a5601c8eac6bfcac7d869574547f505d44e49065
Parents: 057e80c
Author: Vladimir Rodionov <vr...@hortonworks.com>
Authored: Wed Jan 10 16:26:09 2018 -0800
Committer: Josh Elser <el...@apache.org>
Committed: Fri Jan 12 13:13:17 2018 -0500

----------------------------------------------------------------------
 .../hbase/backup/impl/BackupAdminImpl.java      |   2 +-
 .../hadoop/hbase/backup/impl/BackupManager.java |  19 +--
 .../hbase/backup/impl/BackupSystemTable.java    | 135 +++++++++++++------
 .../impl/IncrementalTableBackupClient.java      |  59 ++++++--
 .../hbase/backup/impl/RestoreTablesClient.java  |  55 ++++----
 .../hbase/backup/impl/TableBackupClient.java    |  32 ++---
 .../hadoop/hbase/backup/TestBackupBase.java     |   4 -
 .../TestIncrementalBackupWithBulkLoad.java      |  24 +++-
 8 files changed, 213 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/a5601c8e/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
index 8ba57d2..f27490c 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
@@ -271,7 +271,7 @@ public class BackupAdminImpl implements BackupAdmin {
         LOG.debug(numDeleted + " bulk loaded files out of " + map.size() + " were deleted");
       }
       if (success) {
-        sysTable.deleteBulkLoadedFiles(map);
+        sysTable.deleteBulkLoadedRows(new ArrayList<byte[]>(map.keySet()));
       }
 
       sysTable.deleteBackupInfo(backupInfo.getBackupId());

http://git-wip-us.apache.org/repos/asf/hbase/blob/a5601c8e/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
index 7199fd5..4ca998c 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.backup.BackupInfo;
 import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
+import org.apache.hadoop.hbase.backup.BackupObserver;
 import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
 import org.apache.hadoop.hbase.backup.BackupType;
 import org.apache.hadoop.hbase.backup.HBackupFileSystem;
@@ -43,6 +44,7 @@ import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
 import org.apache.hadoop.hbase.backup.regionserver.LogRollRegionServerProcedureManager;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.procedure.ProcedureManagerHost;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -140,10 +142,14 @@ public class BackupManager implements Closeable {
       conf.set(ProcedureManagerHost.REGIONSERVER_PROCEDURE_CONF_KEY, classes + ","
           + regionProcedureClass);
     }
+    String coproc = conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY);
+    String regionObserverClass = BackupObserver.class.getName();
+    conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, (coproc == null ? "" : coproc + ",") +
+        regionObserverClass);
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Added region procedure manager: " + regionProcedureClass);
+      LOG.debug("Added region procedure manager: " + regionProcedureClass +
+        ". Added region observer: " + regionObserverClass);
     }
-
   }
 
   public static boolean isBackupEnabled(Configuration conf) {
@@ -415,13 +421,8 @@ public class BackupManager implements Closeable {
     return systemTable.readBulkloadRows(tableList);
   }
 
-  public void removeBulkLoadedRows(List<TableName> lst, List<byte[]> rows) throws IOException {
-    systemTable.removeBulkLoadedRows(lst, rows);
-  }
-
-  public void writeBulkLoadedFiles(List<TableName> sTableList, Map<byte[], List<Path>>[] maps)
-      throws IOException {
-    systemTable.writeBulkLoadedFiles(sTableList, maps, backupInfo.getBackupId());
+  public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException {
+    systemTable.deleteBulkLoadedRows(rows);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/a5601c8e/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
index 6b721d4..cf34d14 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
@@ -42,8 +42,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
@@ -53,6 +51,8 @@ import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
 import org.apache.hadoop.hbase.backup.BackupType;
 import org.apache.hadoop.hbase.backup.util.BackupUtils;
 import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
@@ -62,6 +62,8 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.SnapshotDescription;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Pair;
@@ -122,7 +124,21 @@ public final class BackupSystemTable implements Closeable {
 
   }
 
+  /**
+   * Backup system table (main) name
+   */
   private TableName tableName;
+
+  /**
+   * Backup System table name for bulk loaded files.
+   * We keep all bulk loaded file references in a separate table
+   * because we have to isolate general backup operations: create, merge etc
+   * from activity of RegionObserver, which controls process of a bulk loading
+   * {@link org.apache.hadoop.hbase.backup.BackupObserver}
+   */
+
+  private TableName bulkLoadTableName;
+
   /**
    * Stores backup sessions (contexts)
    */
@@ -174,20 +190,29 @@ public final class BackupSystemTable implements Closeable {
 
   public BackupSystemTable(Connection conn) throws IOException {
     this.connection = conn;
-    tableName = BackupSystemTable.getTableName(conn.getConfiguration());
+    Configuration conf = this.connection.getConfiguration();
+    tableName = BackupSystemTable.getTableName(conf);
+    bulkLoadTableName = BackupSystemTable.getTableNameForBulkLoadedData(conf);
     checkSystemTable();
   }
 
   private void checkSystemTable() throws IOException {
     try (Admin admin = connection.getAdmin()) {
       verifyNamespaceExists(admin);
-
+      Configuration conf = connection.getConfiguration();
       if (!admin.tableExists(tableName)) {
-        HTableDescriptor backupHTD =
-            BackupSystemTable.getSystemTableDescriptor(connection.getConfiguration());
+        TableDescriptor backupHTD =
+            BackupSystemTable.getSystemTableDescriptor(conf);
         admin.createTable(backupHTD);
       }
-      waitForSystemTable(admin);
+      if (!admin.tableExists(bulkLoadTableName)) {
+        TableDescriptor blHTD =
+            BackupSystemTable.getSystemTableForBulkLoadedDataDescriptor(conf);
+        admin.createTable(blHTD);
+      }
+      waitForSystemTable(admin, tableName);
+      waitForSystemTable(admin, bulkLoadTableName);
+
     }
   }
 
@@ -207,7 +232,7 @@ public final class BackupSystemTable implements Closeable {
     }
   }
 
-  private void waitForSystemTable(Admin admin) throws IOException {
+  private void waitForSystemTable(Admin admin, TableName tableName) throws IOException {
     long TIMEOUT = 60000;
     long startTime = EnvironmentEdgeManager.currentTime();
     while (!admin.tableExists(tableName) || !admin.isTableAvailable(tableName)) {
@@ -216,10 +241,11 @@ public final class BackupSystemTable implements Closeable {
       } catch (InterruptedException e) {
       }
       if (EnvironmentEdgeManager.currentTime() - startTime > TIMEOUT) {
-        throw new IOException("Failed to create backup system table after " + TIMEOUT + "ms");
+        throw new IOException("Failed to create backup system table "+
+      tableName +" after " + TIMEOUT + "ms");
       }
     }
-    LOG.debug("Backup table exists and available");
+    LOG.debug("Backup table "+tableName+" exists and available");
 
   }
 
@@ -251,7 +277,7 @@ public final class BackupSystemTable implements Closeable {
    */
   Map<byte[], String> readBulkLoadedFiles(String backupId) throws IOException {
     Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId);
-    try (Table table = connection.getTable(tableName);
+    try (Table table = connection.getTable(bulkLoadTableName);
         ResultScanner scanner = table.getScanner(scan)) {
       Result res = null;
       Map<byte[], String> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
@@ -279,7 +305,7 @@ public final class BackupSystemTable implements Closeable {
       throws IOException {
     Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId);
     Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList == null ? 1 : sTableList.size()];
-    try (Table table = connection.getTable(tableName);
+    try (Table table = connection.getTable(bulkLoadTableName);
         ResultScanner scanner = table.getScanner(scan)) {
       Result res = null;
       while ((res = scanner.next()) != null) {
@@ -324,18 +350,6 @@ public final class BackupSystemTable implements Closeable {
     }
   }
 
-  /*
-   * @param map Map of row keys to path of bulk loaded hfile
-   */
-  void deleteBulkLoadedFiles(Map<byte[], String> map) throws IOException {
-    try (Table table = connection.getTable(tableName)) {
-      List<Delete> dels = new ArrayList<>();
-      for (byte[] row : map.keySet()) {
-        dels.add(new Delete(row).addFamily(BackupSystemTable.META_FAMILY));
-      }
-      table.delete(dels);
-    }
-  }
 
   /**
    * Deletes backup status from backup system table table
@@ -366,7 +380,7 @@ public final class BackupSystemTable implements Closeable {
       LOG.debug("write bulk load descriptor to backup " + tabName + " with " + finalPaths.size()
           + " entries");
     }
-    try (Table table = connection.getTable(tableName)) {
+    try (Table table = connection.getTable(bulkLoadTableName)) {
       List<Put> puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region, finalPaths);
       table.put(puts);
       LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName);
@@ -386,7 +400,7 @@ public final class BackupSystemTable implements Closeable {
       LOG.debug("write bulk load descriptor to backup " + tabName + " with " + pairs.size()
           + " entries");
     }
-    try (Table table = connection.getTable(tableName)) {
+    try (Table table = connection.getTable(bulkLoadTableName)) {
       List<Put> puts =
           BackupSystemTable.createPutForPreparedBulkload(tabName, region, family, pairs);
       table.put(puts);
@@ -399,8 +413,8 @@ public final class BackupSystemTable implements Closeable {
    * @param lst list of table names
    * @param rows the rows to be deleted
    */
-  public void removeBulkLoadedRows(List<TableName> lst, List<byte[]> rows) throws IOException {
-    try (Table table = connection.getTable(tableName)) {
+  public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException {
+    try (Table table = connection.getTable(bulkLoadTableName)) {
       List<Delete> lstDels = new ArrayList<>();
       for (byte[] row : rows) {
         Delete del = new Delete(row);
@@ -408,7 +422,7 @@ public final class BackupSystemTable implements Closeable {
         LOG.debug("orig deleting the row: " + Bytes.toString(row));
       }
       table.delete(lstDels);
-      LOG.debug("deleted " + rows.size() + " original bulkload rows for " + lst.size() + " tables");
+      LOG.debug("deleted " + rows.size() + " original bulkload rows");
     }
   }
 
@@ -425,7 +439,7 @@ public final class BackupSystemTable implements Closeable {
     for (TableName tTable : tableList) {
       Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(tTable);
       Map<String, Map<String, List<Pair<String, Boolean>>>> tblMap = map.get(tTable);
-      try (Table table = connection.getTable(tableName);
+      try (Table table = connection.getTable(bulkLoadTableName);
           ResultScanner scanner = table.getScanner(scan)) {
         Result res = null;
         while ((res = scanner.next()) != null) {
@@ -480,7 +494,7 @@ public final class BackupSystemTable implements Closeable {
    */
   public void writeBulkLoadedFiles(List<TableName> sTableList, Map<byte[], List<Path>>[] maps,
       String backupId) throws IOException {
-    try (Table table = connection.getTable(tableName)) {
+    try (Table table = connection.getTable(bulkLoadTableName)) {
       long ts = EnvironmentEdgeManager.currentTime();
       int cnt = 0;
       List<Put> puts = new ArrayList<>();
@@ -1311,21 +1325,28 @@ public final class BackupSystemTable implements Closeable {
    * Get backup system table descriptor
    * @return table's descriptor
    */
-  public static HTableDescriptor getSystemTableDescriptor(Configuration conf) {
+  public static TableDescriptor getSystemTableDescriptor(Configuration conf) {
+
+    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(getTableName(conf));
+
+    ColumnFamilyDescriptorBuilder colBuilder =
+        ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY);
 
-    HTableDescriptor tableDesc = new HTableDescriptor(getTableName(conf));
-    HColumnDescriptor colSessionsDesc = new HColumnDescriptor(SESSIONS_FAMILY);
-    colSessionsDesc.setMaxVersions(1);
-    // Time to keep backup sessions (secs)
+    colBuilder.setMaxVersions(1);
     Configuration config = HBaseConfiguration.create();
     int ttl =
         config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY,
           BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT);
-    colSessionsDesc.setTimeToLive(ttl);
-    tableDesc.addFamily(colSessionsDesc);
-    HColumnDescriptor colMetaDesc = new HColumnDescriptor(META_FAMILY);
-    tableDesc.addFamily(colMetaDesc);
-    return tableDesc;
+    colBuilder.setTimeToLive(ttl);
+
+    ColumnFamilyDescriptor colSessionsDesc = colBuilder.build();
+    builder.addColumnFamily(colSessionsDesc);
+
+    colBuilder =
+        ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY);
+    colBuilder.setTimeToLive(ttl);
+    builder.addColumnFamily(colBuilder.build());
+    return builder.build();
   }
 
   public static TableName getTableName(Configuration conf) {
@@ -1344,6 +1365,38 @@ public final class BackupSystemTable implements Closeable {
   }
 
   /**
+   * Get backup system table descriptor
+   * @return table's descriptor
+   */
+  public static TableDescriptor getSystemTableForBulkLoadedDataDescriptor(Configuration conf) {
+
+    TableDescriptorBuilder builder =
+        TableDescriptorBuilder.newBuilder(getTableNameForBulkLoadedData(conf));
+
+    ColumnFamilyDescriptorBuilder colBuilder =
+        ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY);
+    colBuilder.setMaxVersions(1);
+    Configuration config = HBaseConfiguration.create();
+    int ttl =
+        config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY,
+          BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT);
+    colBuilder.setTimeToLive(ttl);
+    ColumnFamilyDescriptor colSessionsDesc = colBuilder.build();
+    builder.addColumnFamily(colSessionsDesc);
+    colBuilder =
+        ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY);
+    colBuilder.setTimeToLive(ttl);
+    builder.addColumnFamily(colBuilder.build());
+    return builder.build();
+  }
+
+  public static TableName getTableNameForBulkLoadedData(Configuration conf) {
+    String name =
+        conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY,
+          BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT) + "_bulk";
+    return TableName.valueOf(name);
+  }
+  /**
    * Creates Put operation for a given backup info object
    * @param context backup info
    * @return put operation

http://git-wip-us.apache.org/repos/asf/hbase/blob/a5601c8e/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
index a966ad3..34d713d 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
@@ -72,7 +72,6 @@ public class IncrementalTableBackupClient extends TableBackupClient {
   }
 
   protected List<String> filterMissingFiles(List<String> incrBackupFileList) throws IOException {
-    FileSystem fs = FileSystem.get(conf);
     List<String> list = new ArrayList<String>();
     for (String file : incrBackupFileList) {
       Path p = new Path(file);
@@ -110,6 +109,7 @@ public class IncrementalTableBackupClient extends TableBackupClient {
    * @param sTableList list of tables to be backed up
    * @return map of table to List of files
    */
+  @SuppressWarnings("unchecked")
   protected Map<byte[], List<Path>>[] handleBulkLoad(List<TableName> sTableList) throws IOException {
     Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList.size()];
     List<String> activeFiles = new ArrayList<String>();
@@ -117,7 +117,6 @@ public class IncrementalTableBackupClient extends TableBackupClient {
     Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>> pair =
     backupManager.readBulkloadRows(sTableList);
     Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = pair.getFirst();
-    FileSystem fs = FileSystem.get(conf);
     FileSystem tgtFs;
     try {
       tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf);
@@ -126,6 +125,7 @@ public class IncrementalTableBackupClient extends TableBackupClient {
     }
     Path rootdir = FSUtils.getRootDir(conf);
     Path tgtRoot = new Path(new Path(backupInfo.getBackupRootDir()), backupId);
+
     for (Map.Entry<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> tblEntry :
       map.entrySet()) {
       TableName srcTable = tblEntry.getKey();
@@ -192,26 +192,47 @@ public class IncrementalTableBackupClient extends TableBackupClient {
     }
 
     copyBulkLoadedFiles(activeFiles, archiveFiles);
-
-    backupManager.writeBulkLoadedFiles(sTableList, mapForSrc);
-    backupManager.removeBulkLoadedRows(sTableList, pair.getSecond());
+    backupManager.deleteBulkLoadedRows(pair.getSecond());
     return mapForSrc;
   }
 
   private void copyBulkLoadedFiles(List<String> activeFiles, List<String> archiveFiles)
-    throws IOException
-  {
+      throws IOException {
 
     try {
       // Enable special mode of BackupDistCp
       conf.setInt(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 5);
       // Copy active files
       String tgtDest = backupInfo.getBackupRootDir() + Path.SEPARATOR + backupInfo.getBackupId();
-      if (activeFiles.size() > 0) {
+      int attempt = 1;
+      while (activeFiles.size() > 0) {
+        LOG.info("Copy "+ activeFiles.size() +
+          " active bulk loaded files. Attempt ="+ (attempt++));
         String[] toCopy = new String[activeFiles.size()];
         activeFiles.toArray(toCopy);
-        incrementalCopyHFiles(toCopy, tgtDest);
+        // Active file can be archived during copy operation,
+        // we need to handle this properly
+        try {
+          incrementalCopyHFiles(toCopy, tgtDest);
+          break;
+        } catch (IOException e) {
+          // Check if some files got archived
+          // Update active and archived lists
+          // When file is being moved from active to archive
+          // directory, the number of active files decreases
+
+          int numOfActive = activeFiles.size();
+          updateFileLists(activeFiles, archiveFiles);
+          if (activeFiles.size() < numOfActive) {
+            continue;
+          }
+          // if not - throw exception
+          throw e;
+        }
       }
+      // If incremental copy will fail for archived files
+      // we will have partially loaded files in backup destination (only files from active data
+      // directory). It is OK, because the backup will marked as FAILED and data will be cleaned up
       if (archiveFiles.size() > 0) {
         String[] toCopy = new String[archiveFiles.size()];
         archiveFiles.toArray(toCopy);
@@ -224,6 +245,25 @@ public class IncrementalTableBackupClient extends TableBackupClient {
 
   }
 
+  private void updateFileLists(List<String> activeFiles, List<String> archiveFiles)
+      throws IOException {
+    List<String> newlyArchived = new ArrayList<String>();
+
+    for (String spath : activeFiles) {
+      if (!fs.exists(new Path(spath))) {
+        newlyArchived.add(spath);
+      }
+    }
+
+    if (newlyArchived.size() > 0) {
+      activeFiles.removeAll(newlyArchived);
+      archiveFiles.addAll(newlyArchived);
+    }
+
+    LOG.debug(newlyArchived.size() + " files have been archived.");
+
+  }
+
   @Override
   public void execute() throws IOException {
 
@@ -322,7 +362,6 @@ public class IncrementalTableBackupClient extends TableBackupClient {
   protected void deleteBulkLoadDirectory() throws IOException {
     // delete original bulk load directory on method exit
     Path path = getBulkOutputDir();
-    FileSystem fs = FileSystem.get(conf);
     boolean result = fs.delete(path, true);
     if (!result) {
       LOG.warn("Could not delete " + path);

http://git-wip-us.apache.org/repos/asf/hbase/blob/a5601c8e/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
index d3d3e06..c6b6bad 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
@@ -21,33 +21,31 @@ package org.apache.hadoop.hbase.backup.impl;
 import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.backup.BackupType;
 import org.apache.hadoop.hbase.backup.HBackupFileSystem;
 import org.apache.hadoop.hbase.backup.RestoreRequest;
 import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
-import org.apache.hadoop.hbase.backup.util.BackupUtils;
 import org.apache.hadoop.hbase.backup.util.RestoreTool;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
-import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.LoadQueueItem;
 
 /**
  * Restore table implementation
@@ -171,8 +169,10 @@ public class RestoreTablesClient {
     for (int i = 1; i < images.length; i++) {
       BackupImage im = images[i];
       String fileBackupDir =
-          HBackupFileSystem.getTableBackupDataDir(im.getRootDir(), im.getBackupId(), sTable);
-      dirList.add(new Path(fileBackupDir));
+         HBackupFileSystem.getTableBackupDir(im.getRootDir(), im.getBackupId(), sTable);
+      List<Path> list = getFilesRecursively(fileBackupDir);
+      dirList.addAll(list);
+
     }
 
     String dirs = StringUtils.join(dirList, ",");
@@ -185,6 +185,20 @@ public class RestoreTablesClient {
     LOG.info(sTable + " has been successfully restored to " + tTable);
   }
 
+  private List<Path> getFilesRecursively(String fileBackupDir)
+      throws IllegalArgumentException, IOException {
+    FileSystem fs = FileSystem.get((new Path(fileBackupDir)).toUri(), new Configuration());
+    List<Path> list = new ArrayList<Path>();
+    RemoteIterator<LocatedFileStatus> it = fs.listFiles(new Path(fileBackupDir), true);
+    while (it.hasNext()) {
+      Path p = it.next().getPath();
+      if (HFile.isHFileFormat(fs, p)) {
+        list.add(p);
+      }
+    }
+    return list;
+  }
+
   /**
    * Restore operation. Stage 2: resolved Backup Image dependency
    * @param backupManifestMap : tableName, Manifest
@@ -226,27 +240,6 @@ public class RestoreTablesClient {
         }
       }
     }
-    try (BackupSystemTable table = new BackupSystemTable(conn)) {
-      List<TableName> sTableList = Arrays.asList(sTableArray);
-      for (String id : backupIdSet) {
-        LOG.debug("restoring bulk load for " + id);
-        Map<byte[], List<Path>>[] mapForSrc = table.readBulkLoadedFiles(id, sTableList);
-        Map<LoadQueueItem, ByteBuffer> loaderResult;
-        conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true);
-        LoadIncrementalHFiles loader = BackupUtils.createLoader(conf);
-        for (int i = 0; i < sTableList.size(); i++) {
-          if (mapForSrc[i] != null && !mapForSrc[i].isEmpty()) {
-            loaderResult = loader.run(mapForSrc[i], tTableArray[i]);
-            LOG.debug("bulk loading " + sTableList.get(i) + " to " + tTableArray[i]);
-            if (loaderResult.isEmpty()) {
-              String msg = "Couldn't bulk load for " + sTableList.get(i) + " to " + tTableArray[i];
-              LOG.error(msg);
-              throw new IOException(msg);
-            }
-          }
-        }
-      }
-    }
     LOG.debug("restoreStage finished");
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/a5601c8e/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java
index 09ab28c..ab24cca 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java
@@ -69,6 +69,7 @@ public abstract class TableBackupClient {
 
   protected BackupManager backupManager;
   protected BackupInfo backupInfo;
+  protected FileSystem fs;
 
   public TableBackupClient() {
   }
@@ -90,6 +91,7 @@ public abstract class TableBackupClient {
     this.tableList = request.getTableList();
     this.conn = conn;
     this.conf = conn.getConfiguration();
+    this.fs = FSUtils.getCurrentFileSystem(conf);
     backupInfo =
         backupManager.createBackupInfo(backupId, request.getBackupType(), tableList,
           request.getTargetRootDir(), request.getTotalTasks(), request.getBandwidth());
@@ -258,22 +260,21 @@ public abstract class TableBackupClient {
     }
   }
 
-  public static void cleanupAndRestoreBackupSystem (Connection conn, BackupInfo backupInfo,
-      Configuration conf) throws IOException
-  {
+  public static void cleanupAndRestoreBackupSystem(Connection conn, BackupInfo backupInfo,
+      Configuration conf) throws IOException {
     BackupType type = backupInfo.getType();
-     // if full backup, then delete HBase snapshots if there already are snapshots taken
-     // and also clean up export snapshot log files if exist
-     if (type == BackupType.FULL) {
-       deleteSnapshots(conn, backupInfo, conf);
-       cleanupExportSnapshotLog(conf);
-     }
-     BackupSystemTable.restoreFromSnapshot(conn);
-     BackupSystemTable.deleteSnapshot(conn);
-     // clean up the uncompleted data at target directory if the ongoing backup has already entered
-     // the copy phase
-     // For incremental backup, DistCp logs will be cleaned with the targetDir.
-     cleanupTargetDir(backupInfo, conf);
+    // if full backup, then delete HBase snapshots if there already are snapshots taken
+    // and also clean up export snapshot log files if exist
+    if (type == BackupType.FULL) {
+      deleteSnapshots(conn, backupInfo, conf);
+      cleanupExportSnapshotLog(conf);
+    }
+    BackupSystemTable.restoreFromSnapshot(conn);
+    BackupSystemTable.deleteSnapshot(conn);
+    // clean up the uncompleted data at target directory if the ongoing backup has already entered
+    // the copy phase
+    // For incremental backup, DistCp logs will be cleaned with the targetDir.
+    cleanupTargetDir(backupInfo, conf);
   }
 
 
@@ -355,7 +356,6 @@ public abstract class TableBackupClient {
    */
   protected void cleanupDistCpLog(BackupInfo backupInfo, Configuration conf) throws IOException {
     Path rootPath = new Path(backupInfo.getHLogTargetDir()).getParent();
-    FileSystem fs = FileSystem.get(rootPath.toUri(), conf);
     FileStatus[] files = FSUtils.listStatus(fs, rootPath);
     if (files == null) {
       return;

http://git-wip-us.apache.org/repos/asf/hbase/blob/a5601c8e/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
index 8ce1c0e..2be7784 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
@@ -57,7 +57,6 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.security.HadoopSecurityEnabledUserProviderForTesting;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.security.access.SecureTestUtil;
@@ -297,9 +296,6 @@ public class TestBackupBase {
       // setup configuration
       SecureTestUtil.enableSecurity(TEST_UTIL.getConfiguration());
     }
-    String coproc = conf1.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY);
-    conf1.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, (coproc == null ? "" : coproc + ",") +
-        BackupObserver.class.getName());
     conf1.setBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, true);
     BackupManager.decorateMasterConfiguration(conf1);
     BackupManager.decorateRegionServerConfiguration(conf1);

http://git-wip-us.apache.org/repos/asf/hbase/blob/a5601c8e/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java
index db33abe..ed1d010 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java
@@ -113,18 +113,32 @@ public class TestIncrementalBackupWithBulkLoad extends TestBackupBase {
     request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR);
     String backupIdIncMultiple = client.backupTables(request);
     assertTrue(checkSucceeded(backupIdIncMultiple));
+    // #4 bulk load again
+    LOG.debug("bulk loading into " + testName);
+    int actual1 = TestLoadIncrementalHFiles.loadHFiles(testName, table1Desc, TEST_UTIL, famName,
+      qualName, false, null,
+      new byte[][][] { new byte[][] { Bytes.toBytes("ppp"), Bytes.toBytes("qqq") },
+          new byte[][] { Bytes.toBytes("rrr"), Bytes.toBytes("sss") }, },
+      true, false, true, NB_ROWS_IN_BATCH * 2 + actual, NB_ROWS2);
 
+    // #5 - incremental backup for table1
+    tables = Lists.newArrayList(table1);
+    request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR);
+    String backupIdIncMultiple1 = client.backupTables(request);
+    assertTrue(checkSucceeded(backupIdIncMultiple1));
+    // Delete all data in table1
+    TEST_UTIL.deleteTableData(table1);
     // #5.1 - check tables for full restore */
     HBaseAdmin hAdmin = TEST_UTIL.getHBaseAdmin();
 
     // #6 - restore incremental backup for table1
     TableName[] tablesRestoreIncMultiple = new TableName[] { table1 };
-    TableName[] tablesMapIncMultiple = new TableName[] { table1_restore };
-    client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple,
-      false, tablesRestoreIncMultiple, tablesMapIncMultiple, true));
+    //TableName[] tablesMapIncMultiple = new TableName[] { table1_restore };
+    client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple1,
+      false, tablesRestoreIncMultiple, tablesRestoreIncMultiple, true));
 
-    HTable hTable = (HTable) conn.getTable(table1_restore);
-    Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH * 2+actual);
+    HTable hTable = (HTable) conn.getTable(table1);
+    Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH * 2 + actual + actual1);
     request = createBackupRequest(BackupType.FULL, tables, BACKUP_ROOT_DIR);
 
     backupIdFull = client.backupTables(request);