You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2017/03/28 23:23:44 UTC

hbase git commit: HBASE-14417 Incremental backup and bulk loading

Repository: hbase
Updated Branches:
  refs/heads/master cb4fac1d1 -> 0345fc877


HBASE-14417 Incremental backup and bulk loading


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/0345fc87
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/0345fc87
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/0345fc87

Branch: refs/heads/master
Commit: 0345fc87759a7d44ecc385327ebb586fc632fb65
Parents: cb4fac1
Author: tedyu <yu...@gmail.com>
Authored: Tue Mar 28 16:23:36 2017 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Tue Mar 28 16:23:36 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/backup/BackupHFileCleaner.java | 180 +++++++++
 .../hadoop/hbase/backup/BackupObserver.java     | 102 +++++
 .../hbase/backup/impl/BackupAdminImpl.java      |  28 +-
 .../hadoop/hbase/backup/impl/BackupManager.java |  16 +
 .../hbase/backup/impl/BackupSystemTable.java    | 393 ++++++++++++++++++-
 .../impl/IncrementalTableBackupClient.java      | 124 ++++++
 .../hbase/backup/impl/RestoreTablesClient.java  |  96 +++--
 .../backup/mapreduce/MapReduceRestoreJob.java   |   6 +-
 .../hbase/mapreduce/LoadIncrementalHFiles.java  |  20 +-
 .../hadoop/hbase/util/HFileArchiveUtil.java     |  16 +
 .../hadoop/hbase/backup/TestBackupBase.java     |  15 +
 .../hbase/backup/TestBackupHFileCleaner.java    | 141 +++++++
 .../TestIncrementalBackupWithBulkLoad.java      | 145 +++++++
 .../mapreduce/TestLoadIncrementalHFiles.java    |  46 ++-
 14 files changed, 1275 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/0345fc87/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupHFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupHFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupHFileCleaner.java
new file mode 100644
index 0000000..b6b4c0a
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupHFileCleaner.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+
+/**
+ * Implementation of a file cleaner that checks if an hfile is still referenced by backup before
+ * deleting it from hfile archive directory.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class BackupHFileCleaner extends BaseHFileCleanerDelegate implements Abortable {
+  private static final Log LOG = LogFactory.getLog(BackupHFileCleaner.class);
+  private boolean stopped = false;
+  private boolean aborted;
+  private Configuration conf;
+  private Connection connection;
+  private long prevReadFromBackupTbl = 0, // timestamp of most recent read from hbase:backup table
+      secondPrevReadFromBackupTbl = 0; // timestamp of 2nd most recent read from hbase:backup table
+  //used by unit test to skip reading hbase:backup
+  private boolean checkForFullyBackedUpTables = true;
+  private List<TableName> fullyBackedUpTables = null;
+
+  private Set<String> getFilenameFromBulkLoad(Map<byte[], List<Path>>[] maps) {
+    Set<String> filenames = new HashSet<String>();
+    for (Map<byte[], List<Path>> map : maps) {
+      if (map == null) continue;
+      for (List<Path> paths : map.values()) {
+        for (Path p : paths) {
+          filenames.add(p.getName());
+        }
+      }
+    }
+    return filenames;
+  }
+
+  private Set<String> loadHFileRefs(List<TableName> tableList) throws IOException {
+    if (connection == null) {
+      connection = ConnectionFactory.createConnection(conf);
+    }
+    try (BackupSystemTable tbl = new BackupSystemTable(connection)) {
+      Map<byte[], List<Path>>[] res =
+          tbl.readBulkLoadedFiles(null, tableList);
+      secondPrevReadFromBackupTbl = prevReadFromBackupTbl;
+      prevReadFromBackupTbl = EnvironmentEdgeManager.currentTime();
+      return getFilenameFromBulkLoad(res);
+    }
+  }
+
+  @VisibleForTesting
+  void setCheckForFullyBackedUpTables(boolean b) {
+    checkForFullyBackedUpTables = b;
+  }
+  @Override
+  public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
+    if (conf == null) {
+      return files;
+    }
+    // obtain the Set of TableName's which have been fully backed up
+    // so that we filter BulkLoad to be returned from server
+    if (checkForFullyBackedUpTables) {
+      if (connection == null) return files;
+      try (BackupSystemTable tbl = new BackupSystemTable(connection)) {
+        fullyBackedUpTables = tbl.getTablesForBackupType(BackupType.FULL);
+      } catch (IOException ioe) {
+        LOG.error("Failed to get tables which have been fully backed up, skipping checking", ioe);
+        return Collections.emptyList();
+      }
+      Collections.sort(fullyBackedUpTables);
+    }
+    final Set<String> hfileRefs;
+    try {
+      hfileRefs = loadHFileRefs(fullyBackedUpTables);
+    } catch (IOException ioe) {
+      LOG.error("Failed to read hfile references, skipping checking deletable files", ioe);
+      return Collections.emptyList();
+    }
+    Iterable<FileStatus> deletables = Iterables.filter(files, new Predicate<FileStatus>() {
+      @Override
+      public boolean apply(FileStatus file) {
+        // If the file is recent, be conservative and wait for one more scan of hbase:backup table
+        if (file.getModificationTime() > secondPrevReadFromBackupTbl) {
+          return false;
+        }
+        String hfile = file.getPath().getName();
+        boolean foundHFileRef = hfileRefs.contains(hfile);
+        return !foundHFileRef;
+      }
+    });
+    return deletables;
+  }
+
+  @Override
+  public boolean isFileDeletable(FileStatus fStat) {
+    // work is done in getDeletableFiles()
+    return true;
+  }
+
+  @Override
+  public void setConf(Configuration config) {
+    this.conf = config;
+    this.connection = null;
+    try {
+      this.connection = ConnectionFactory.createConnection(conf);
+    } catch (IOException ioe) {
+      LOG.error("Couldn't establish connection", ioe);
+    }
+  }
+
+  @Override
+  public void stop(String why) {
+    if (this.stopped) {
+      return;
+    }
+    if (this.connection != null) {
+      try {
+        this.connection.close();
+      } catch (IOException ioe) {
+        LOG.debug("Got " + ioe + " when closing connection");
+      }
+    }
+    this.stopped = true;
+  }
+
+  @Override
+  public boolean isStopped() {
+    return this.stopped;
+  }
+
+  @Override
+  public void abort(String why, Throwable e) {
+    LOG.warn("Aborting ReplicationHFileCleaner because " + why, e);
+    this.aborted = true;
+    stop(why);
+  }
+
+  @Override
+  public boolean isAborted() {
+    return this.aborted;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0345fc87/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java
new file mode 100644
index 0000000..595e862
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupManager;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * An Observer to facilitate backup operations
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class BackupObserver implements RegionObserver {
+  private static final Log LOG = LogFactory.getLog(BackupObserver.class);
+  @Override
+  public boolean postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
+    List<Pair<byte[], String>> stagingFamilyPaths, Map<byte[], List<Path>> finalPaths,
+    boolean hasLoaded) throws IOException {
+    Configuration cfg = ctx.getEnvironment().getConfiguration();
+    if (!hasLoaded) {
+      // there is no need to record state
+      return hasLoaded;
+    }
+    if (finalPaths == null || !BackupManager.isBackupEnabled(cfg)) {
+      LOG.debug("skipping recording bulk load in postBulkLoadHFile since backup is disabled");
+      return hasLoaded;
+    }
+    try (Connection connection = ConnectionFactory.createConnection(cfg);
+        BackupSystemTable tbl = new BackupSystemTable(connection)) {
+      List<TableName> fullyBackedUpTables = tbl.getTablesForBackupType(BackupType.FULL);
+      HRegionInfo info = ctx.getEnvironment().getRegionInfo();
+      TableName tableName = info.getTable();
+      if (!fullyBackedUpTables.contains(tableName)) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(tableName + " has not gone thru full backup");
+        }
+        return hasLoaded;
+      }
+      tbl.writePathsPostBulkLoad(tableName, info.getEncodedNameAsBytes(), finalPaths);
+      return hasLoaded;
+    } catch (IOException ioe) {
+      LOG.error("Failed to get tables which have been fully backed up", ioe);
+      return false;
+    }
+  }
+  @Override
+  public void preCommitStoreFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
+      final byte[] family, final List<Pair<Path, Path>> pairs) throws IOException {
+    Configuration cfg = ctx.getEnvironment().getConfiguration();
+    if (pairs == null || pairs.isEmpty() || !BackupManager.isBackupEnabled(cfg)) {
+      LOG.debug("skipping recording bulk load in preCommitStoreFile since backup is disabled");
+      return;
+    }
+    try (Connection connection = ConnectionFactory.createConnection(cfg);
+        BackupSystemTable tbl = new BackupSystemTable(connection)) {
+      List<TableName> fullyBackedUpTables = tbl.getTablesForBackupType(BackupType.FULL);
+      HRegionInfo info = ctx.getEnvironment().getRegionInfo();
+      TableName tableName = info.getTable();
+      if (!fullyBackedUpTables.contains(tableName)) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(tableName + " has not gone thru full backup");
+        }
+        return;
+      }
+      tbl.writeFilesForBulkLoadPreCommit(tableName, info.getEncodedNameAsBytes(), family, pairs);
+      return;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0345fc87/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
index c1d5258..eb60860 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
@@ -188,7 +188,33 @@ public class BackupAdminImpl implements BackupAdmin {
           removeTableFromBackupImage(info, tn, sysTable);
         }
       }
-      LOG.debug("Delete backup info " + backupInfo.getBackupId());
+      Map<byte[], String> map = sysTable.readBulkLoadedFiles(backupId);
+      FileSystem fs = FileSystem.get(conn.getConfiguration());
+      boolean succ = true;
+      int numDeleted = 0;
+      for (String f : map.values()) {
+        Path p = new Path(f);
+        try {
+          LOG.debug("Delete backup info " + p + " for " + backupInfo.getBackupId());
+          if (!fs.delete(p)) {
+            if (fs.exists(p)) {
+              LOG.warn(f + " was not deleted");
+              succ = false;
+            }
+          } else {
+            numDeleted++;
+          }
+        } catch (IOException ioe) {
+          LOG.warn(f + " was not deleted", ioe);
+          succ = false;
+        }
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(numDeleted + " bulk loaded files out of " + map.size() + " were deleted");
+      }
+      if (succ) {
+        sysTable.deleteBulkLoadedFiles(map);
+      }
 
       sysTable.deleteBackupInfo(backupInfo.getBackupId());
       LOG.info("Delete backup " + backupInfo.getBackupId() + " completed.");

http://git-wip-us.apache.org/repos/asf/hbase/blob/0345fc87/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
index 1d27e79..c09ce48 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.commons.logging.Log;
@@ -46,6 +47,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.procedure.ProcedureManagerHost;
+import org.apache.hadoop.hbase.util.Pair;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -393,6 +395,20 @@ public class BackupManager implements Closeable {
     return systemTable.readRegionServerLastLogRollResult(backupInfo.getBackupRootDir());
   }
 
+  public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>>
+  readBulkloadRows(List<TableName> tableList) throws IOException {
+    return systemTable.readBulkloadRows(tableList);
+  }
+
+  public void removeBulkLoadedRows(List<TableName> lst, List<byte[]> rows) throws IOException {
+    systemTable.removeBulkLoadedRows(lst, rows);
+  }
+
+  public void writeBulkLoadedFiles(List<TableName> sTableList, Map<byte[], List<Path>>[] maps)
+      throws IOException {
+    systemTable.writeBulkLoadedFiles(sTableList, maps, backupInfo.getBackupId());
+  }
+
   /**
    * Get all completed backup information (in desc order by time)
    * @return history info of BackupCompleteData

http://git-wip-us.apache.org/repos/asf/hbase/blob/0345fc87/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
index 6362f8e..1ba8087 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
@@ -22,11 +22,13 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.TreeSet;
 
 import org.apache.commons.lang.StringUtils;
@@ -35,6 +37,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -44,6 +47,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.backup.BackupInfo;
 import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
 import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
+import org.apache.hadoop.hbase.backup.BackupType;
 import org.apache.hadoop.hbase.backup.util.BackupUtils;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Admin;
@@ -59,6 +63,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
 
 /**
  * This class provides API to access backup system table<br>
@@ -77,6 +82,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
  */
 @InterfaceAudience.Private
 public final class BackupSystemTable implements Closeable {
+  private static final Log LOG = LogFactory.getLog(BackupSystemTable.class);
 
   static class WALItem {
     String backupId;
@@ -108,8 +114,6 @@ public final class BackupSystemTable implements Closeable {
 
   }
 
-  private static final Log LOG = LogFactory.getLog(BackupSystemTable.class);
-
   private TableName tableName;
   /**
    *  Stores backup sessions (contexts)
@@ -119,6 +123,7 @@ public final class BackupSystemTable implements Closeable {
    * Stores other meta
    */
   final static byte[] META_FAMILY = "meta".getBytes();
+  final static byte[] BULK_LOAD_FAMILY = "bulk".getBytes();
   /**
    *  Connection to HBase cluster, shared among all instances
    */
@@ -130,9 +135,22 @@ public final class BackupSystemTable implements Closeable {
   private final static String INCR_BACKUP_SET = "incrbackupset:";
   private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:";
   private final static String RS_LOG_TS_PREFIX = "rslogts:";
+
+  private final static String BULK_LOAD_PREFIX = "bulk:";
+  private final static byte[] BULK_LOAD_PREFIX_BYTES = BULK_LOAD_PREFIX.getBytes();
+  final static byte[] TBL_COL = Bytes.toBytes("tbl");
+  final static byte[] FAM_COL = Bytes.toBytes("fam");
+  final static byte[] PATH_COL = Bytes.toBytes("path");
+  final static byte[] STATE_COL = Bytes.toBytes("state");
+  // the two states a bulk loaded file can be
+  final static byte[] BL_PREPARE = Bytes.toBytes("R");
+  final static byte[] BL_COMMIT = Bytes.toBytes("D");
+
   private final static String WALS_PREFIX = "wals:";
   private final static String SET_KEY_PREFIX = "backupset:";
 
+  // separator between BULK_LOAD_PREFIX and ordinals
+ protected final static String BLK_LD_DELIM = ":";
   private final static byte[] EMPTY_VALUE = new byte[] {};
 
   // Safe delimiter in a string
@@ -196,6 +214,97 @@ public final class BackupSystemTable implements Closeable {
     }
   }
 
+  /*
+   * @param backupId the backup Id
+   * @return Map of rows to path of bulk loaded hfile
+   */
+  Map<byte[], String> readBulkLoadedFiles(String backupId) throws IOException {
+    Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId);
+    try (Table table = connection.getTable(tableName);
+        ResultScanner scanner = table.getScanner(scan)) {
+      Result res = null;
+      Map<byte[], String> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+      while ((res = scanner.next()) != null) {
+        res.advance();
+        byte[] row = CellUtil.cloneRow(res.listCells().get(0));
+        for (Cell cell : res.listCells()) {
+          if (CellComparator.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0,
+              BackupSystemTable.PATH_COL.length) == 0) {
+            map.put(row, Bytes.toString(CellUtil.cloneValue(cell)));
+          }
+        }
+      }
+      return map;
+    }
+  }
+
+  /*
+   * Used during restore
+   * @param backupId the backup Id
+   * @param sTableList List of tables
+   * @return array of Map of family to List of Paths
+   */
+  public Map<byte[], List<Path>>[] readBulkLoadedFiles(String backupId, List<TableName> sTableList)
+      throws IOException {
+    Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId);
+    Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList == null ? 1 : sTableList.size()];
+    try (Table table = connection.getTable(tableName);
+        ResultScanner scanner = table.getScanner(scan)) {
+      Result res = null;
+      while ((res = scanner.next()) != null) {
+        res.advance();
+        TableName tbl = null;
+        byte[] fam = null;
+        String path = null;
+        for (Cell cell : res.listCells()) {
+          if (CellComparator.compareQualifiers(cell, BackupSystemTable.TBL_COL, 0,
+              BackupSystemTable.TBL_COL.length) == 0) {
+            tbl = TableName.valueOf(CellUtil.cloneValue(cell));
+          } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0,
+              BackupSystemTable.FAM_COL.length) == 0) {
+            fam = CellUtil.cloneValue(cell);
+          } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0,
+              BackupSystemTable.PATH_COL.length) == 0) {
+            path = Bytes.toString(CellUtil.cloneValue(cell));
+          }
+        }
+        int srcIdx = IncrementalTableBackupClient.getIndex(tbl, sTableList);
+        if (srcIdx == -1) {
+          // the table is not among the query
+          continue;
+        }
+        if (mapForSrc[srcIdx] == null) {
+          mapForSrc[srcIdx] = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+        }
+        List<Path> files;
+        if (!mapForSrc[srcIdx].containsKey(fam)) {
+          files = new ArrayList<Path>();
+          mapForSrc[srcIdx].put(fam, files);
+        } else {
+          files = mapForSrc[srcIdx].get(fam);
+        }
+        files.add(new Path(path));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("found bulk loaded file : " + tbl + " " +  Bytes.toString(fam) + " " + path);
+        }
+      };
+      return mapForSrc;
+    }
+  }
+
+  /*
+   * @param map Map of row keys to path of bulk loaded hfile
+   */
+  void deleteBulkLoadedFiles(Map<byte[], String> map) throws IOException {
+    try (Table table = connection.getTable(tableName)) {
+      List<Delete> dels = new ArrayList<>();
+      for (byte[] row : map.keySet()) {
+        dels.add(new Delete(row).addFamily(BackupSystemTable.META_FAMILY));
+      }
+      table.delete(dels);
+    }
+  }
+
   /**
    * Deletes backup status from backup system table table
    * @param backupId backup id
@@ -213,6 +322,156 @@ public final class BackupSystemTable implements Closeable {
     }
   }
 
+  /*
+   * For postBulkLoadHFile() hook.
+   * @param tabName table name
+   * @param region the region receiving hfile
+   * @param finalPaths family and associated hfiles
+   */
+  public void writePathsPostBulkLoad(TableName tabName, byte[] region,
+      Map<byte[], List<Path>> finalPaths) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("write bulk load descriptor to backup " + tabName + " with " +
+          finalPaths.size() + " entries");
+    }
+    try (Table table = connection.getTable(tableName)) {
+      List<Put> puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region,
+          finalPaths);
+      table.put(puts);
+      LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName);
+    }
+  }
+  /*
+   * For preCommitStoreFile() hook
+   * @param tabName table name
+   * @param region the region receiving hfile
+   * @param family column family
+   * @param pairs list of paths for hfiles
+   */
+  public void writeFilesForBulkLoadPreCommit(TableName tabName, byte[] region,
+      final byte[] family, final List<Pair<Path, Path>> pairs) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("write bulk load descriptor to backup " + tabName + " with " +
+          pairs.size() + " entries");
+    }
+    try (Table table = connection.getTable(tableName)) {
+      List<Put> puts = BackupSystemTable.createPutForPreparedBulkload(tabName, region,
+          family, pairs);
+      table.put(puts);
+      LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName);
+    }
+  }
+
+  /*
+   * Removes rows recording bulk loaded hfiles from backup table
+   * @param lst list of table names
+   * @param rows the rows to be deleted
+   */
+  public void removeBulkLoadedRows(List<TableName> lst, List<byte[]> rows) throws IOException {
+    try (Table table = connection.getTable(tableName)) {
+      List<Delete> lstDels = new ArrayList<>();
+      for (byte[] row : rows) {
+        Delete del = new Delete(row);
+        lstDels.add(del);
+        LOG.debug("orig deleting the row: " + Bytes.toString(row));
+      }
+      table.delete(lstDels);
+      LOG.debug("deleted " + rows.size() + " original bulkload rows for " + lst.size() + " tables");
+    }
+  }
+
+  /*
+   * Reads the rows from backup table recording bulk loaded hfiles
+   * @param tableList list of table names
+   * @return The keys of the Map are table, region and column family.
+   *  Value of the map reflects whether the hfile was recorded by preCommitStoreFile hook (true)
+   */
+  public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>>
+  readBulkloadRows(List<TableName> tableList) throws IOException {
+    Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = new HashMap<>();
+    List<byte[]> rows = new ArrayList<>();
+    for (TableName tTable : tableList) {
+      Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(tTable);
+      Map<String, Map<String, List<Pair<String, Boolean>>>> tblMap = map.get(tTable);
+      try (Table table = connection.getTable(tableName);
+          ResultScanner scanner = table.getScanner(scan)) {
+        Result res = null;
+        while ((res = scanner.next()) != null) {
+          res.advance();
+          String fam = null;
+          String path = null;
+          boolean raw = false;
+          byte[] row = null;
+          String region = null;
+          for (Cell cell : res.listCells()) {
+            row = CellUtil.cloneRow(cell);
+            rows.add(row);
+            String rowStr = Bytes.toString(row);
+            region = BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr);
+            if (CellComparator.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0,
+                BackupSystemTable.FAM_COL.length) == 0) {
+              fam = Bytes.toString(CellUtil.cloneValue(cell));
+            } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0,
+                BackupSystemTable.PATH_COL.length) == 0) {
+              path = Bytes.toString(CellUtil.cloneValue(cell));
+            } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.STATE_COL, 0,
+                BackupSystemTable.STATE_COL.length) == 0) {
+              byte[] state = CellUtil.cloneValue(cell);
+              if (Bytes.equals(BackupSystemTable.BL_PREPARE, state)) {
+                raw = true;
+              } else raw = false;
+            }
+          }
+          if (map.get(tTable) == null) {
+            map.put(tTable, new HashMap<String, Map<String, List<Pair<String, Boolean>>>>());
+            tblMap = map.get(tTable);
+          }
+          if (tblMap.get(region) == null) {
+            tblMap.put(region, new HashMap<String, List<Pair<String, Boolean>>>());
+          }
+          Map<String, List<Pair<String, Boolean>>> famMap = tblMap.get(region);
+          if (famMap.get(fam) == null) {
+            famMap.put(fam, new ArrayList<Pair<String, Boolean>>());
+          }
+          famMap.get(fam).add(new Pair<>(path, raw));
+          LOG.debug("found orig " + path + " for " + fam + " of table " + region);
+        }
+      }
+    }
+    return new Pair<>(map, rows);
+  }
+
+  /*
+   * @param sTableList List of tables
+   * @param maps array of Map of family to List of Paths
+   * @param backupId the backup Id
+   */
+  public void writeBulkLoadedFiles(List<TableName> sTableList, Map<byte[], List<Path>>[] maps,
+      String backupId) throws IOException {
+    try (Table table = connection.getTable(tableName)) {
+      long ts = EnvironmentEdgeManager.currentTime();
+      int cnt = 0;
+      List<Put> puts = new ArrayList<>();
+      for (int idx = 0; idx < maps.length; idx++) {
+        Map<byte[], List<Path>> map = maps[idx];
+        TableName tn = sTableList.get(idx);
+        if (map == null) continue;
+        for (Map.Entry<byte[], List<Path>> entry: map.entrySet()) {
+          byte[] fam = entry.getKey();
+          List<Path> paths = entry.getValue();
+          for (Path p : paths) {
+            Put put = BackupSystemTable.createPutForBulkLoadedFile(tn, fam, p.toString(),
+                backupId, ts, cnt++);
+            puts.add(put);
+          }
+        }
+      }
+      if (!puts.isEmpty()) {
+        table.put(puts);
+      }
+    }
+  }
+
   /**
    * Reads backup status object (instance of backup info) from backup system table table
    * @param backupId backup id
@@ -399,6 +658,21 @@ public final class BackupSystemTable implements Closeable {
 
   }
 
+  /*
+   * Retrieve TableName's for completed backup of given type
+   * @param type backup type
+   * @return List of table names
+   */
+  public List<TableName> getTablesForBackupType(BackupType type) throws IOException {
+    Set<TableName> names = new HashSet<>();
+    List<BackupInfo> infos = getBackupHistory(true);
+    for (BackupInfo info : infos) {
+      if (info.getType() != type) continue;
+      names.addAll(info.getTableNames());
+    }
+    return new ArrayList(names);
+  }
+
   /**
    * Get history for backup destination
    * @param backupRoot backup destination path
@@ -1233,6 +1507,119 @@ public final class BackupSystemTable implements Closeable {
     return s.substring(index + 1);
   }
 
+  /*
+   * Creates Put's for bulk load resulting from running LoadIncrementalHFiles
+   */
+  static List<Put> createPutForCommittedBulkload(TableName table, byte[] region,
+      Map<byte[], List<Path>> finalPaths) {
+    List<Put> puts = new ArrayList<>();
+    for (Map.Entry<byte[], List<Path>> entry : finalPaths.entrySet()) {
+      for (Path path : entry.getValue()) {
+        String file = path.toString();
+        int lastSlash = file.lastIndexOf("/");
+        String filename = file.substring(lastSlash+1);
+        Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM,
+            Bytes.toString(region), BLK_LD_DELIM, filename));
+        put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName());
+        put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, entry.getKey());
+        put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL,
+            file.getBytes());
+        put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_COMMIT);
+        puts.add(put);
+        LOG.debug("writing done bulk path " + file + " for " + table + " " +
+            Bytes.toString(region));
+      }
+    }
+    return puts;
+  }
+
+  /*
+   * Creates Put's for bulk load resulting from running LoadIncrementalHFiles
+   */
+  static List<Put> createPutForPreparedBulkload(TableName table, byte[] region,
+      final byte[] family, final List<Pair<Path, Path>> pairs) {
+    List<Put> puts = new ArrayList<>();
+    for (Pair<Path, Path> pair : pairs) {
+      Path path = pair.getSecond();
+      String file = path.toString();
+      int lastSlash = file.lastIndexOf("/");
+      String filename = file.substring(lastSlash+1);
+      Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM,
+          Bytes.toString(region), BLK_LD_DELIM, filename));
+      put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName());
+      put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, family);
+      put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL,
+          file.getBytes());
+      put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_PREPARE);
+      puts.add(put);
+      LOG.debug("writing raw bulk path " + file + " for " + table + " " +
+          Bytes.toString(region));
+    }
+    return puts;
+  }
+  public static List<Delete> createDeleteForOrigBulkLoad(List<TableName> lst) {
+    List<Delete> lstDels = new ArrayList<>();
+    for (TableName table : lst) {
+      Delete del = new Delete(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM));
+      del.addFamily(BackupSystemTable.META_FAMILY);
+      lstDels.add(del);
+    }
+    return lstDels;
+  }
+
+  static Scan createScanForOrigBulkLoadedFiles(TableName table) throws IOException {
+    Scan scan = new Scan();
+    byte[] startRow = rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM);
+    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
+    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
+    scan.withStartRow(startRow);
+    scan.withStopRow(stopRow);
+    scan.addFamily(BackupSystemTable.META_FAMILY);
+    scan.setMaxVersions(1);
+    return scan;
+  }
+  static String getTableNameFromOrigBulkLoadRow(String rowStr) {
+    String[] parts = rowStr.split(BLK_LD_DELIM);
+    return parts[1];
+  }
+  static String getRegionNameFromOrigBulkLoadRow(String rowStr) {
+    // format is bulk : namespace : table : region : file
+    String[] parts = rowStr.split(BLK_LD_DELIM);
+    int idx = 3;
+    if (parts.length == 4) {
+      // the table is in default namespace
+      idx = 2;
+    }
+    LOG.debug("bulk row string " + rowStr + " region " + parts[idx]);
+    return parts[idx];
+  }
+  /*
+   * Used to query bulk loaded hfiles which have been copied by incremental backup
+   * @param backupId the backup Id. It can be null when querying for all tables
+   * @return the Scan object
+   */
+  static Scan createScanForBulkLoadedFiles(String backupId) throws IOException {
+    Scan scan = new Scan();
+    byte[] startRow = backupId == null ? BULK_LOAD_PREFIX_BYTES :
+      rowkey(BULK_LOAD_PREFIX, backupId+BLK_LD_DELIM);
+    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
+    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
+    scan.setStartRow(startRow);
+    scan.setStopRow(stopRow);
+    //scan.setTimeRange(lower, Long.MAX_VALUE);
+    scan.addFamily(BackupSystemTable.META_FAMILY);
+    scan.setMaxVersions(1);
+    return scan;
+  }
+
+  static Put createPutForBulkLoadedFile(TableName tn, byte[] fam, String p, String backupId,
+      long ts, int idx) {
+    Put put = new Put(rowkey(BULK_LOAD_PREFIX, backupId+BLK_LD_DELIM+ts+BLK_LD_DELIM+idx));
+    put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, tn.getName());
+    put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, fam);
+    put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, p.getBytes());
+    return put;
+  }
   /**
    * Creates put list for list of WAL files
    * @param files list of WAL file paths
@@ -1364,7 +1751,7 @@ public final class BackupSystemTable implements Closeable {
     return Bytes.toString(data).substring(SET_KEY_PREFIX.length());
   }
 
-  private byte[] rowkey(String s, String... other) {
+  private static byte[] rowkey(String s, String... other) {
     StringBuilder sb = new StringBuilder(s);
     for (String ss : other) {
       sb.append(ss);

http://git-wip-us.apache.org/repos/asf/hbase/blob/0345fc87/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
index 395ed6d..8f6f264 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
@@ -18,15 +18,21 @@
 
 package org.apache.hadoop.hbase.backup.impl;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
@@ -40,6 +46,10 @@ import org.apache.hadoop.hbase.backup.BackupType;
 import org.apache.hadoop.hbase.backup.util.BackupUtils;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.HFileArchiveUtil;
+import org.apache.hadoop.hbase.util.Pair;
 
 /**
  * Incremental backup implementation.
@@ -154,6 +164,118 @@ public class IncrementalTableBackupClient extends TableBackupClient {
     return list;
   }
 
+  static int getIndex(TableName tbl, List<TableName> sTableList) {
+    if (sTableList == null) return 0;
+    for (int i = 0; i < sTableList.size(); i++) {
+      if (tbl.equals(sTableList.get(i))) {
+        return i;
+      }
+    }
+    return -1;
+  }
+
+  /*
+   * Reads bulk load records from backup table, iterates through the records and forms the paths
+   * for bulk loaded hfiles. Copies the bulk loaded hfiles to backup destination
+   * @param sTableList list of tables to be backed up
+   * @return map of table to List of files
+   */
+  Map<byte[], List<Path>>[] handleBulkLoad(List<TableName> sTableList) throws IOException {
+    Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList.size()];
+    Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>> pair =
+    backupManager.readBulkloadRows(sTableList);
+    Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = pair.getFirst();
+    FileSystem fs = FileSystem.get(conf);
+    FileSystem tgtFs;
+    try {
+      tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf);
+    } catch (URISyntaxException use) {
+      throw new IOException("Unable to get FileSystem", use);
+    }
+    Path rootdir = FSUtils.getRootDir(conf);
+    Path tgtRoot = new Path(new Path(backupInfo.getBackupRootDir()), backupId);
+    for (Map.Entry<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> tblEntry :
+      map.entrySet()) {
+      TableName srcTable = tblEntry.getKey();
+      int srcIdx = getIndex(srcTable, sTableList);
+      if (srcIdx < 0) {
+        LOG.warn("Couldn't find " + srcTable + " in source table List");
+        continue;
+      }
+      if (mapForSrc[srcIdx] == null) {
+        mapForSrc[srcIdx] = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
+      }
+      Path tblDir = FSUtils.getTableDir(rootdir, srcTable);
+      Path tgtTable = new Path(new Path(tgtRoot, srcTable.getNamespaceAsString()),
+          srcTable.getQualifierAsString());
+      for (Map.Entry<String,Map<String,List<Pair<String, Boolean>>>> regionEntry :
+        tblEntry.getValue().entrySet()){
+        String regionName = regionEntry.getKey();
+        Path regionDir = new Path(tblDir, regionName);
+        // map from family to List of hfiles
+        for (Map.Entry<String,List<Pair<String, Boolean>>> famEntry :
+          regionEntry.getValue().entrySet()) {
+          String fam = famEntry.getKey();
+          Path famDir = new Path(regionDir, fam);
+          List<Path> files;
+          if (!mapForSrc[srcIdx].containsKey(fam.getBytes())) {
+            files = new ArrayList<Path>();
+            mapForSrc[srcIdx].put(fam.getBytes(), files);
+          } else {
+            files = mapForSrc[srcIdx].get(fam.getBytes());
+          }
+          Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, regionName, fam);
+          String tblName = srcTable.getQualifierAsString();
+          Path tgtFam = new Path(new Path(tgtTable, regionName), fam);
+          if (!tgtFs.mkdirs(tgtFam)) {
+            throw new IOException("couldn't create " + tgtFam);
+          }
+          for (Pair<String, Boolean> fileWithState : famEntry.getValue()) {
+            String file = fileWithState.getFirst();
+            boolean raw = fileWithState.getSecond();
+            int idx = file.lastIndexOf("/");
+            String filename = file;
+            if (idx > 0) {
+              filename = file.substring(idx+1);
+            }
+            Path p = new Path(famDir, filename);
+            Path tgt = new Path(tgtFam, filename);
+            Path archive = new Path(archiveDir, filename);
+            if (fs.exists(p)) {
+              if (LOG.isTraceEnabled()) {
+                LOG.trace("found bulk hfile " + file + " in " + famDir + " for " + tblName);
+              }
+              try {
+                if (LOG.isTraceEnabled()) {
+                  LOG.trace("copying " + p + " to " + tgt);
+                }
+                FileUtil.copy(fs, p, tgtFs, tgt, false,conf);
+              } catch (FileNotFoundException e) {
+                LOG.debug("copying archive " + archive + " to " + tgt);
+                try {
+                  FileUtil.copy(fs, archive, tgtFs, tgt, false, conf);
+                } catch (FileNotFoundException fnfe) {
+                  if (!raw) throw fnfe;
+                }
+              }
+            } else {
+              LOG.debug("copying archive " + archive + " to " + tgt);
+              try {
+                FileUtil.copy(fs, archive, tgtFs, tgt, false, conf);
+              } catch (FileNotFoundException fnfe) {
+                if (!raw) throw fnfe;
+              }
+            }
+            files.add(tgt);
+          }
+        }
+      }
+    }
+    backupManager.writeBulkLoadedFiles(sTableList, mapForSrc);
+    backupManager.removeBulkLoadedRows(sTableList, pair.getSecond());
+    return mapForSrc;
+  }
+
   @Override
   public void execute() throws IOException {
 
@@ -204,6 +326,8 @@ public class IncrementalTableBackupClient extends TableBackupClient {
           BackupUtils.getMinValue(BackupUtils
               .getRSLogTimestampMins(newTableSetTimestampMap));
       backupManager.writeBackupStartCode(newStartCode);
+
+      handleBulkLoad(backupInfo.getTableNames());
       // backup complete
       completeBackup(conn, backupInfo, backupManager, BackupType.INCREMENTAL, conf);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/0345fc87/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
index f418305..2e4ecce 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
@@ -19,9 +19,14 @@
 package org.apache.hadoop.hbase.backup.impl;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.TreeSet;
 
 import org.apache.commons.lang.StringUtils;
@@ -34,10 +39,13 @@ import org.apache.hadoop.hbase.backup.BackupType;
 import org.apache.hadoop.hbase.backup.HBackupFileSystem;
 import org.apache.hadoop.hbase.backup.RestoreRequest;
 import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
+import org.apache.hadoop.hbase.backup.mapreduce.MapReduceRestoreJob;
 import org.apache.hadoop.hbase.backup.util.RestoreTool;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.LoadQueueItem;
 
 /**
  * Restore table implementation
@@ -50,6 +58,7 @@ public class RestoreTablesClient {
   private Configuration conf;
   private Connection conn;
   private String backupId;
+  private String fullBackupId;
   private TableName[] sTableArray;
   private TableName[] tTableArray;
   private String targetRootDir;
@@ -141,6 +150,7 @@ public class RestoreTablesClient {
     // We need hFS only for full restore (see the code)
     BackupManifest manifest = HBackupFileSystem.getManifest(sTable, conf, backupRoot, backupId);
     if (manifest.getType() == BackupType.FULL) {
+      fullBackupId = manifest.getBackupImage().getBackupId();
       LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from full" + " backup image "
           + tableBackupPath.toString());
       restoreTool.fullRestoreTable(conn, tableBackupPath, sTable, tTable, truncateIfExists,
@@ -170,7 +180,6 @@ public class RestoreTablesClient {
     restoreTool.incrementalRestoreTable(conn, tableBackupPath, paths, new TableName[] { sTable },
       new TableName[] { tTable }, lastIncrBackupId);
     LOG.info(sTable + " has been successfully restored to " + tTable);
-
   }
 
   /**
@@ -185,39 +194,74 @@ public class RestoreTablesClient {
       TableName[] sTableArray, TableName[] tTableArray, boolean isOverwrite) throws IOException {
     TreeSet<BackupImage> restoreImageSet = new TreeSet<BackupImage>();
     boolean truncateIfExists = isOverwrite;
-    try {
-      for (int i = 0; i < sTableArray.length; i++) {
-        TableName table = sTableArray[i];
-        BackupManifest manifest = backupManifestMap.get(table);
-        // Get the image list of this backup for restore in time order from old
-        // to new.
-        List<BackupImage> list = new ArrayList<BackupImage>();
-        list.add(manifest.getBackupImage());
-        TreeSet<BackupImage> set = new TreeSet<BackupImage>(list);
-        List<BackupImage> depList = manifest.getDependentListByTable(table);
-        set.addAll(depList);
-        BackupImage[] arr = new BackupImage[set.size()];
-        set.toArray(arr);
-        restoreImages(arr, table, tTableArray[i], truncateIfExists);
-        restoreImageSet.addAll(list);
-        if (restoreImageSet != null && !restoreImageSet.isEmpty()) {
-          LOG.info("Restore includes the following image(s):");
-          for (BackupImage image : restoreImageSet) {
-            LOG.info("Backup: "
-                + image.getBackupId()
-                + " "
-                + HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(),
+    Set<String> backupIdSet = new HashSet<>();
+    for (int i = 0; i < sTableArray.length; i++) {
+      TableName table = sTableArray[i];
+      BackupManifest manifest = backupManifestMap.get(table);
+      // Get the image list of this backup for restore in time order from old
+      // to new.
+      List<BackupImage> list = new ArrayList<BackupImage>();
+      list.add(manifest.getBackupImage());
+      TreeSet<BackupImage> set = new TreeSet<BackupImage>(list);
+      List<BackupImage> depList = manifest.getDependentListByTable(table);
+      set.addAll(depList);
+      BackupImage[] arr = new BackupImage[set.size()];
+      set.toArray(arr);
+      restoreImages(arr, table, tTableArray[i], truncateIfExists);
+      restoreImageSet.addAll(list);
+      if (restoreImageSet != null && !restoreImageSet.isEmpty()) {
+        LOG.info("Restore includes the following image(s):");
+        for (BackupImage image : restoreImageSet) {
+          LOG.info("Backup: "
+              + image.getBackupId()
+              + " "
+              + HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(),
                   table));
+          if (image.getType() == BackupType.INCREMENTAL) {
+            backupIdSet.add(image.getBackupId());
+            LOG.debug("adding " + image.getBackupId() + " for bulk load");
+          }
+        }
+      }
+    }
+    try (BackupSystemTable table = new BackupSystemTable(conn)) {
+      List<TableName> sTableList = Arrays.asList(sTableArray);
+      for (String id : backupIdSet) {
+        LOG.debug("restoring bulk load for " + id);
+        Map<byte[], List<Path>>[] mapForSrc = table.readBulkLoadedFiles(id, sTableList);
+        Map<LoadQueueItem, ByteBuffer> loaderResult;
+        conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true);
+        LoadIncrementalHFiles loader = MapReduceRestoreJob.createLoader(conf);
+        for (int i = 0; i < sTableList.size(); i++) {
+          if (mapForSrc[i] != null && !mapForSrc[i].isEmpty()) {
+            loaderResult = loader.run(null, mapForSrc[i], tTableArray[i]);
+            LOG.debug("bulk loading " + sTableList.get(i) + " to " + tTableArray[i]);
+            if (loaderResult.isEmpty()) {
+              String msg = "Couldn't bulk load for " + sTableList.get(i) + " to " +tTableArray[i];
+              LOG.error(msg);
+              throw new IOException(msg);
+            }
           }
         }
       }
-    } catch (Exception e) {
-      LOG.error("Failed", e);
-      throw new IOException(e);
     }
     LOG.debug("restoreStage finished");
   }
 
+  static long getTsFromBackupId(String backupId) {
+    if (backupId == null) {
+      return 0;
+    }
+    return Long.parseLong(backupId.substring(backupId.lastIndexOf("_")+1));
+  }
+
+  static boolean withinRange(long a, long lower, long upper) {
+    if (a < lower || a > upper) {
+      return false;
+    }
+    return true;
+  }
+
   public void execute() throws IOException {
 
     // case VALIDATION:

http://git-wip-us.apache.org/repos/asf/hbase/blob/0345fc87/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
index ffb61ec..9bafe12 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
@@ -98,7 +98,7 @@ public class MapReduceRestoreJob implements RestoreJob {
         result = player.run(playerArgs);
         if (succeeded(result)) {
           // do bulk load
-          LoadIncrementalHFiles loader = createLoader();
+          LoadIncrementalHFiles loader = createLoader(getConf());
           if (LOG.isDebugEnabled()) {
             LOG.debug("Restoring HFiles from directory " + bulkOutputPath);
           }
@@ -134,13 +134,13 @@ public class MapReduceRestoreJob implements RestoreJob {
     return result == 0;
   }
 
-  private LoadIncrementalHFiles createLoader() throws IOException {
+  public static LoadIncrementalHFiles createLoader(Configuration config) throws IOException {
     // set configuration for restore:
     // LoadIncrementalHFile needs more time
     // <name>hbase.rpc.timeout</name> <value>600000</value>
     // calculates
     Integer milliSecInHour = 3600000;
-    Configuration conf = new Configuration(getConf());
+    Configuration conf = new Configuration(config);
     conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, milliSecInHour);
 
     // By default, it is 32 and loader will fail if # of files in any region exceed this

http://git-wip-us.apache.org/repos/asf/hbase/blob/0345fc87/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index f59e24c..80dfd66 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -27,6 +27,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Deque;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -62,6 +63,9 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.backup.BackupType;
+import org.apache.hadoop.hbase.backup.impl.BackupManager;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.client.Admin;
@@ -144,7 +148,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
     initialize();
   }
 
-  private void initialize() throws Exception {
+  private void initialize() throws IOException {
     if (initalized) {
       return;
     }
@@ -282,6 +286,14 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
     public String toString() {
       return "family:"+ Bytes.toString(family) + " path:" + hfilePath.toString();
     }
+
+    public byte[] getFamily() {
+      return family;
+    }
+
+    public Path getFilePath() {
+      return hfilePath;
+    }
   }
 
   /*
@@ -1184,7 +1196,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
    * If the table is created for the first time, then "completebulkload" reads the files twice.
    * More modifications necessary if we want to avoid doing it.
    */
-  private void createTable(TableName tableName, String dirPath, Admin admin) throws Exception {
+  private void createTable(TableName tableName, String dirPath, Admin admin) throws IOException {
     final Path hfofDir = new Path(dirPath);
     final FileSystem fs = hfofDir.getFileSystem(getConf());
 
@@ -1238,7 +1250,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
   }
 
   public Map<LoadQueueItem, ByteBuffer> run(String dirPath, Map<byte[], List<Path>> map,
-      TableName tableName) throws Exception{
+      TableName tableName) throws IOException {
     initialize();
     try (Connection connection = ConnectionFactory.createConnection(getConf());
         Admin admin = connection.getAdmin()) {
@@ -1261,7 +1273,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
       try (Table table = connection.getTable(tableName);
         RegionLocator locator = connection.getRegionLocator(tableName)) {
         boolean silence = "yes".equalsIgnoreCase(getConf().get(IGNORE_UNMATCHED_CF_CONF_KEY, ""));
-        boolean copyFiles = "yes".equalsIgnoreCase(getConf().get(ALWAYS_COPY_FILES, ""));
+        boolean copyFiles = getConf().getBoolean(ALWAYS_COPY_FILES, false);
         if (dirPath != null) {
           doBulkLoad(hfofDir, admin, table, locator, silence, copyFiles);
         } else {

http://git-wip-us.apache.org/repos/asf/hbase/blob/0345fc87/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java
index faae4ef..8e3e105 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java
@@ -167,4 +167,20 @@ public class HFileArchiveUtil {
   private static Path getArchivePath(final Path rootdir) {
     return new Path(rootdir, HConstants.HFILE_ARCHIVE_DIRECTORY);
   }
+  
+  /*
+   * @return table name given archive file path
+   */
+  public static TableName getTableName(Path archivePath) {
+    Path p = archivePath;
+    String tbl = null;
+    // namespace is the 4th parent of file
+    for (int i = 0; i < 5; i++) {
+      if (p == null) return null;
+      if (i == 3) tbl = p.getName();
+      p = p.getParent();
+    }
+    if (p == null) return null;
+    return TableName.valueOf(p.getName(), tbl);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/0345fc87/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
index ec88549..e6bd73e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
@@ -49,6 +49,10 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.mapreduce.HadoopSecurityEnabledUserProviderForTesting;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.access.SecureTestUtil;
 import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.wal.WALFactory;
@@ -88,6 +92,7 @@ public class TestBackupBase {
   protected static String BACKUP_ROOT_DIR = "/backupUT";
   protected static String BACKUP_REMOTE_ROOT_DIR = "/backupUT";
   protected static String provider = "defaultProvider";
+  protected static boolean secure = false;
 
   /**
    * @throws java.lang.Exception
@@ -96,6 +101,16 @@ public class TestBackupBase {
   public static void setUpBeforeClass() throws Exception {
     TEST_UTIL = new HBaseTestingUtility();
     conf1 = TEST_UTIL.getConfiguration();
+    if (secure) {
+      // set the always on security provider
+      UserProvider.setUserProviderForTesting(TEST_UTIL.getConfiguration(),
+          HadoopSecurityEnabledUserProviderForTesting.class);
+      // setup configuration
+      SecureTestUtil.enableSecurity(TEST_UTIL.getConfiguration());
+    }
+    String coproc = conf1.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY);
+    conf1.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, (coproc == null ? "" : coproc + ",") +
+        BackupObserver.class.getName());
     conf1.setBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, true);
     BackupManager.decorateMasterConfiguration(conf1);
     BackupManager.decorateRegionServerConfiguration(conf1);

http://git-wip-us.apache.org/repos/asf/hbase/blob/0345fc87/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupHFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupHFileCleaner.java
new file mode 100644
index 0000000..dfbe106
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupHFileCleaner.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MasterTests.class, SmallTests.class })
+public class TestBackupHFileCleaner {
+  private static final Log LOG = LogFactory.getLog(TestBackupHFileCleaner.class);
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static Configuration conf = TEST_UTIL.getConfiguration();
+  private static TableName tableName = TableName.valueOf("backup.hfile.cleaner");
+  private static String famName = "fam";
+  static FileSystem fs = null;
+  Path root;
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    conf.setBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, true);
+    TEST_UTIL.startMiniZKCluster();
+    TEST_UTIL.startMiniCluster(1);
+    fs = FileSystem.get(conf);
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    if (fs != null) {
+      fs.close();
+    }
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void setup() throws IOException {
+    root = TEST_UTIL.getDataTestDirOnTestFS();
+  }
+
+  @After
+  public void cleanup() {
+    try {
+      fs.delete(root, true);
+    } catch (IOException e) {
+      LOG.warn("Failed to delete files recursively from path " + root);
+    }
+  }
+
+  @Test
+  public void testGetDeletableFiles() throws IOException {
+    // 1. Create a file
+    Path file = new Path(root, "testIsFileDeletableWithNoHFileRefs");
+    fs.createNewFile(file);
+    // 2. Assert file is successfully created
+    assertTrue("Test file not created!", fs.exists(file));
+    BackupHFileCleaner cleaner = new BackupHFileCleaner();
+    cleaner.setConf(conf);
+    cleaner.setCheckForFullyBackedUpTables(false);
+    // 3. Assert that file as is should be deletable
+    List<FileStatus> stats = new ArrayList<>();
+    FileStatus stat = fs.getFileStatus(file);
+    stats.add(stat);
+    Iterable<FileStatus> deletable = cleaner.getDeletableFiles(stats);
+    deletable = cleaner.getDeletableFiles(stats);
+    boolean found = false;
+    for (FileStatus stat1 : deletable) {
+      if (stat.equals(stat1)) found = true;
+    }
+    assertTrue("Cleaner should allow to delete this file as there is no hfile reference "
+        + "for it.", found);
+
+    // 4. Add the file as bulk load
+    List<Path> list = new ArrayList<>(1);
+    list.add(file);
+    try (Connection conn = ConnectionFactory.createConnection(conf);
+        BackupSystemTable sysTbl = new BackupSystemTable(conn)) {
+      List<TableName> sTableList = new ArrayList<>();
+      sTableList.add(tableName);
+      Map<byte[], List<Path>>[] maps = new Map[1];
+      maps[0] = new HashMap<>();
+      maps[0].put(famName.getBytes(), list);
+      sysTbl.writeBulkLoadedFiles(sTableList, maps, "1");
+    }
+
+    // 5. Assert file should not be deletable
+    deletable = cleaner.getDeletableFiles(stats);
+    deletable = cleaner.getDeletableFiles(stats);
+    found = false;
+    for (FileStatus stat1 : deletable) {
+      if (stat.equals(stat1)) found = true;
+    }
+    assertFalse("Cleaner should not allow to delete this file as there is a hfile reference "
+        + "for it.", found);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/0345fc87/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java
new file mode 100644
index 0000000..c10ec40
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.mapreduce.TestLoadIncrementalHFiles;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import com.google.common.collect.Lists;
+
+/**
+ * 1. Create table t1
+ * 2. Load data to t1
+ * 3 Full backup t1
+ * 4 Load data to t1
+ * 5 bulk load into t1
+ * 6 Incremental backup t1
+ */
+@Category(LargeTests.class)
+@RunWith(Parameterized.class)
+public class TestIncrementalBackupWithBulkLoad extends TestBackupBase {
+  private static final Log LOG = LogFactory.getLog(TestIncrementalBackupDeleteTable.class);
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> data() {
+    secure = true;
+    List<Object[]> params = new ArrayList<Object[]>();
+    params.add(new Object[] {Boolean.TRUE});
+    return params;
+  }
+
+  public TestIncrementalBackupWithBulkLoad(Boolean b) {
+  }
+  // implement all test cases in 1 test since incremental backup/restore has dependencies
+  @Test
+  public void TestIncBackupDeleteTable() throws Exception {
+    String testName = "TestIncBackupDeleteTable";
+    // #1 - create full backup for all tables
+    LOG.info("create full backup image for all tables");
+
+    List<TableName> tables = Lists.newArrayList(table1);
+    HBaseAdmin admin = null;
+    Connection conn = ConnectionFactory.createConnection(conf1);
+    admin = (HBaseAdmin) conn.getAdmin();
+    BackupAdminImpl client = new BackupAdminImpl(conn);
+
+    BackupRequest request = createBackupRequest(BackupType.FULL, tables, BACKUP_ROOT_DIR);
+    String backupIdFull = client.backupTables(request);
+
+    assertTrue(checkSucceeded(backupIdFull));
+
+    // #2 - insert some data to table table1
+    HTable t1 = (HTable) conn.getTable(table1);
+    Put p1;
+    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+      p1 = new Put(Bytes.toBytes("row-t1" + i));
+      p1.addColumn(famName, qualName, Bytes.toBytes("val" + i));
+      t1.put(p1);
+    }
+
+    Assert.assertEquals(TEST_UTIL.countRows(t1), NB_ROWS_IN_BATCH * 2);
+    t1.close();
+
+    int NB_ROWS2 = 20;
+    LOG.debug("bulk loading into " + testName);
+    int actual = TestLoadIncrementalHFiles.loadHFiles(testName, table1Desc, TEST_UTIL, famName,
+        qualName, false, null, new byte[][][] {
+      new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
+      new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") },
+    }, true, false, true, NB_ROWS_IN_BATCH*2, NB_ROWS2);
+
+    // #3 - incremental backup for table1
+    tables = Lists.newArrayList(table1);
+    request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR);
+    String backupIdIncMultiple = client.backupTables(request);
+    assertTrue(checkSucceeded(backupIdIncMultiple));
+
+    // #5.1 - check tables for full restore */
+    HBaseAdmin hAdmin = TEST_UTIL.getHBaseAdmin();
+
+    // #6 - restore incremental backup for table1
+    TableName[] tablesRestoreIncMultiple = new TableName[] { table1 };
+    TableName[] tablesMapIncMultiple = new TableName[] { table1_restore };
+    client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple,
+      false, tablesRestoreIncMultiple, tablesMapIncMultiple, true));
+
+    HTable hTable = (HTable) conn.getTable(table1_restore);
+    Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH * 2+actual);
+    request = createBackupRequest(BackupType.FULL, tables, BACKUP_ROOT_DIR);
+
+    backupIdFull = client.backupTables(request);
+    try (final BackupSystemTable table = new BackupSystemTable(conn)) {
+      Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>> pair
+      = table.readBulkloadRows(tables);
+      assertTrue("map still has " + pair.getSecond().size() + " entries",
+          pair.getSecond().isEmpty());
+    }
+    assertTrue(checkSucceeded(backupIdFull));
+
+    hTable.close();
+    admin.close();
+    conn.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0345fc87/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
index a6dacf7..7ae5afc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
@@ -308,13 +308,14 @@ public class TestLoadIncrementalHFiles {
     runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges, useMap, false);
   }
 
-  private void runTest(String testName, HTableDescriptor htd, BloomType bloomType,
-      boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap,
-      boolean copyFiles) throws Exception {
+  public static int loadHFiles(String testName, HTableDescriptor htd, HBaseTestingUtility util,
+      byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys,
+      byte[][][] hfileRanges, boolean useMap, boolean deleteFile,
+      boolean copyFiles, int initRowCount, int factor) throws Exception {
     Path dir = util.getDataTestDirOnTestFS(testName);
     FileSystem fs = util.getTestFileSystem();
     dir = dir.makeQualified(fs);
-    Path familyDir = new Path(dir, Bytes.toString(FAMILY));
+    Path familyDir = new Path(dir, Bytes.toString(fam));
 
     int hfileIdx = 0;
     Map<byte[], List<Path>> map = null;
@@ -324,26 +325,26 @@ public class TestLoadIncrementalHFiles {
     }
     if (useMap) {
       map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-      map.put(FAMILY, list);
+      map.put(fam, list);
     }
     Path last = null;
     for (byte[][] range : hfileRanges) {
       byte[] from = range[0];
       byte[] to = range[1];
       Path path = new Path(familyDir, "hfile_" + hfileIdx++);
-      HFileTestUtil.createHFile(util.getConfiguration(), fs, path, FAMILY, QUALIFIER, from, to, 1000);
+      HFileTestUtil.createHFile(util.getConfiguration(), fs, path, fam, qual, from, to, factor);
       if (useMap) {
         last = path;
         list.add(path);
       }
     }
-    int expectedRows = hfileIdx * 1000;
+    int expectedRows = hfileIdx * factor;
 
-    if (preCreateTable || map != null) {
+    final TableName tableName = htd.getTableName();
+    if (!util.getHBaseAdmin().tableExists(tableName) && (preCreateTable || map != null)) {
       util.getAdmin().createTable(htd, tableSplitKeys);
     }
 
-    final TableName tableName = htd.getTableName();
     Configuration conf = util.getConfiguration();
     if (copyFiles) {
       conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true);
@@ -351,12 +352,14 @@ public class TestLoadIncrementalHFiles {
     LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
     String [] args= {dir.toString(), tableName.toString()};
     if (useMap) {
-      fs.delete(last);
+      if (deleteFile) fs.delete(last);
       Map<LoadQueueItem, ByteBuffer> loaded = loader.run(null, map, tableName);
-      expectedRows -= 1000;
-      for (LoadQueueItem item : loaded.keySet()) {
-        if (item.hfilePath.getName().equals(last.getName())) {
-          fail(last + " should be missing");
+      if (deleteFile) {
+        expectedRows -= 1000;
+        for (LoadQueueItem item : loaded.keySet()) {
+          if (item.hfilePath.getName().equals(last.getName())) {
+            fail(last + " should be missing");
+          }
         }
       }
     } else {
@@ -365,19 +368,30 @@ public class TestLoadIncrementalHFiles {
 
     if (copyFiles) {
       for (Path p : list) {
-        assertTrue(fs.exists(p));
+        assertTrue(p + " should exist", fs.exists(p));
       }
     }
 
     Table table = util.getConnection().getTable(tableName);
     try {
-      assertEquals(expectedRows, util.countRows(table));
+      assertEquals(initRowCount + expectedRows, util.countRows(table));
     } finally {
       table.close();
     }
 
+    return expectedRows;
+  }
+
+  private void runTest(String testName, HTableDescriptor htd, BloomType bloomType,
+      boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap,
+      boolean copyFiles) throws Exception {
+    loadHFiles(testName, htd, util, FAMILY, QUALIFIER, preCreateTable, tableSplitKeys,
+        hfileRanges, useMap, true, copyFiles, 0, 1000);
+
+    final TableName tableName = htd.getTableName();
     // verify staging folder has been cleaned up
     Path stagingBasePath = new Path(FSUtils.getRootDir(util.getConfiguration()), HConstants.BULKLOAD_STAGING_DIR_NAME);
+    FileSystem fs = util.getTestFileSystem();
     if(fs.exists(stagingBasePath)) {
       FileStatus[] files = fs.listStatus(stagingBasePath);
       for(FileStatus file : files) {