You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2016/09/06 15:49:28 UTC

hbase git commit: HBASE-15449 HBase Backup Phase 3: Support physical table layout change

Repository: hbase
Updated Branches:
  refs/heads/HBASE-7912 6d48260c8 -> 28737d05f


HBASE-15449 HBase Backup Phase 3: Support physical table layout change


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/28737d05
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/28737d05
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/28737d05

Branch: refs/heads/HBASE-7912
Commit: 28737d05f4c336253dd161743accdad31f30efcc
Parents: 6d48260
Author: tedyu <yu...@gmail.com>
Authored: Tue Sep 6 08:48:58 2016 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Tue Sep 6 08:48:58 2016 -0700

----------------------------------------------------------------------
 .../hbase/backup/impl/RestoreClientImpl.java    |  29 +-
 .../hbase/backup/util/BackupServerUtil.java     |   3 +-
 .../hbase/backup/util/RestoreServerUtil.java    | 274 ++++++++++++-------
 .../hbase/mapreduce/LoadIncrementalHFiles.java  |  60 +++-
 .../hadoop/hbase/backup/TestBackupBase.java     |  14 +-
 .../hbase/backup/TestIncrementalBackup.java     |  54 +++-
 6 files changed, 302 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/28737d05/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreClientImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreClientImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreClientImpl.java
index a04fc08..7f23ce0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreClientImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreClientImpl.java
@@ -261,35 +261,40 @@ public final class RestoreClientImpl implements RestoreClient {
 
     // TODO: convert feature will be provided in a future JIRA
     boolean converted = false;
+    String lastIncrBackupId = null;
+    List<String> logDirList = null;
 
+    // Scan incremental backups
+    if (it.hasNext()) {
+      // obtain the backupId for most recent incremental
+      logDirList = new ArrayList<String>();
+      while (it.hasNext()) {
+        BackupImage im = it.next();
+        String logBackupDir = HBackupFileSystem.getLogBackupDir(im.getRootDir(), im.getBackupId());
+        logDirList.add(logBackupDir);
+        lastIncrBackupId = im.getBackupId();
+      }
+    }
     if (manifest.getType() == BackupType.FULL || converted) {
       LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from "
           + (converted ? "converted" : "full") + " backup image " + tableBackupPath.toString());
       restoreTool.fullRestoreTable(tableBackupPath, sTable, tTable, 
-        converted, truncateIfExists);
-      
+        converted, truncateIfExists, lastIncrBackupId);
     } else { // incremental Backup
       throw new IOException("Unexpected backup type " + image.getType());
     }
 
     // The rest one are incremental
-    if (it.hasNext()) {
-      List<String> logDirList = new ArrayList<String>();
-      while (it.hasNext()) {
-        BackupImage im = it.next();
-        String logBackupDir = HBackupFileSystem.getLogBackupDir(im.getRootDir(), im.getBackupId());
-        logDirList.add(logBackupDir);
-      }
+    if (logDirList != null) {
       String logDirs = StringUtils.join(logDirList, ",");
       LOG.info("Restoring '" + sTable + "' to '" + tTable
           + "' from log dirs: " + logDirs);
       String[] sarr = new String[logDirList.size()];
       logDirList.toArray(sarr);
       Path[] paths = org.apache.hadoop.util.StringUtils.stringToPath(sarr);
-      restoreTool.incrementalRestoreTable(paths, new TableName[] { sTable },
-        new TableName[] { tTable });
+      restoreTool.incrementalRestoreTable(tableBackupPath, paths, new TableName[] { sTable },
+        new TableName[] { tTable }, lastIncrBackupId);
     }
     LOG.info(sTable + " has been successfully restored to " + tTable);
   }
-  
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/28737d05/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupServerUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupServerUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupServerUtil.java
index 265ef6c..37c8d65 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupServerUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupServerUtil.java
@@ -172,7 +172,6 @@ public final class BackupServerUtil {
           LOG.warn("Table "+ table+" does not exists, skipping it.");
           continue;
         }
-        LOG.debug("Attempting to copy table info for:" + table);
         TableDescriptor orig = FSTableDescriptors.getTableDescriptorFromFs(fs, rootDir, table);
 
         // write a copy of descriptor to the target directory
@@ -181,6 +180,8 @@ public final class BackupServerUtil {
         FSTableDescriptors descriptors =
             new FSTableDescriptors(conf, targetFs, FSUtils.getRootDir(conf));
         descriptors.createTableDescriptorForTableDirectory(target, orig, false);
+        LOG.debug("Attempting to copy table info for:" + table + " target: " + target +
+            " descriptor: " + orig);
         LOG.debug("Finished copying tableinfo.");
         List<HRegionInfo> regions = null;
         regions = admin.getTableRegions(table);

http://git-wip-us.apache.org/repos/asf/hbase/blob/28737d05/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java
index 6f83f25..620d622 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java
@@ -20,8 +20,11 @@ package org.apache.hadoop.hbase.backup.util;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
@@ -31,9 +34,11 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupInfo;
 import org.apache.hadoop.hbase.backup.BackupRestoreServerFactory;
 import org.apache.hadoop.hbase.backup.HBackupFileSystem;
 import org.apache.hadoop.hbase.backup.IncrementalRestoreService;
@@ -55,6 +60,8 @@ import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
 import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSTableDescriptors;
+import org.apache.hadoop.hbase.util.Pair;
 
 /**
  * A collection for methods used by multiple classes to restore HBase tables.
@@ -133,21 +140,54 @@ public class RestoreServerUtil {
     return regionDirList;
   }
 
+  static void modifyTableSync(Admin admin, HTableDescriptor desc)
+      throws IOException {
+    admin.modifyTable(desc.getTableName(), desc);
+    Pair<Integer, Integer> status = new Pair<Integer, Integer>() {{
+      setFirst(0);
+      setSecond(0);
+    }};
+    int i = 0;
+    do {
+      status = admin.getAlterStatus(desc.getTableName());
+      if (status.getSecond() != 0) {
+        LOG.debug(status.getSecond() - status.getFirst() + "/" + status.getSecond()
+          + " regions updated.");
+        try {
+          Thread.sleep(1 * 1000l);
+        } catch (InterruptedException ie) {
+          InterruptedIOException iie = new InterruptedIOException();
+          iie.initCause(ie);
+          throw iie;
+        }
+      } else {
+        LOG.debug("All regions updated.");
+        break;
+      }
+    } while (status.getFirst() != 0 && i++ < 500);
+    if (status.getFirst() != 0) {
+      throw new IOException("Failed to update all regions even after 500 seconds.");
+    }
+  }
+
   /**
    * During incremental backup operation. Call WalPlayer to replay WAL in backup image Currently
    * tableNames and newTablesNames only contain single table, will be expanded to multiple tables in
    * the future
-   * @param logDir : incremental backup folders, which contains WAL
+   * @param tableBackupPath backup path
+   * @param logDirs : incremental backup folders, which contains WAL
    * @param tableNames : source tableNames(table names were backuped)
    * @param newTableNames : target tableNames(table names to be restored to)
+   * @param incrBackupId incremental backup Id
    * @throws IOException exception
    */
-  public void incrementalRestoreTable(Path[] logDirs,
-      TableName[] tableNames, TableName[] newTableNames) throws IOException {
+  public void incrementalRestoreTable(Path tableBackupPath, Path[] logDirs, TableName[] tableNames,
+      TableName[] newTableNames, String incrBackupId) throws IOException {
 
     if (tableNames.length != newTableNames.length) {
       throw new IOException("Number of source tables and target tables does not match!");
     }
+    FileSystem fileSys = tableBackupPath.getFileSystem(this.conf);
 
     // for incremental backup image, expect the table already created either by user or previous
     // full backup. Here, check that all new tables exists
@@ -160,6 +200,35 @@ public class RestoreServerUtil {
             + " does not exist. Create the table first, e.g. by restoring a full backup.");
         }
       }
+      // adjust table schema
+      for (int i = 0; i < tableNames.length; i++) {
+        TableName tableName = tableNames[i];
+        HTableDescriptor tableDescriptor = getTableDescriptor(fileSys, tableName, incrBackupId);
+        LOG.debug("Found descriptor " + tableDescriptor + " through " + incrBackupId);
+
+        TableName newTableName = newTableNames[i];
+        HTableDescriptor newTableDescriptor = admin.getTableDescriptor(newTableName);
+        List<HColumnDescriptor> families = Arrays.asList(tableDescriptor.getColumnFamilies());
+        List<HColumnDescriptor> existingFamilies =
+            Arrays.asList(newTableDescriptor.getColumnFamilies());
+        boolean schemaChangeNeeded = false;
+        for (HColumnDescriptor family : families) {
+          if (!existingFamilies.contains(family)) {
+            newTableDescriptor.addFamily(family);
+            schemaChangeNeeded = true;
+          }
+        }
+        for (HColumnDescriptor family : existingFamilies) {
+          if (!families.contains(family)) {
+            newTableDescriptor.removeFamily(family.getName());
+            schemaChangeNeeded = true;
+          }
+        }
+        if (schemaChangeNeeded) {
+          RestoreServerUtil.modifyTableSync(admin, newTableDescriptor);
+          LOG.info("Changed " + newTableDescriptor.getTableName() + " to: " + newTableDescriptor);
+        }
+      }
       IncrementalRestoreService restoreService =
           BackupRestoreServerFactory.getIncrementalRestoreService(conf);
 
@@ -168,8 +237,9 @@ public class RestoreServerUtil {
   }
 
   public void fullRestoreTable(Path tableBackupPath, TableName tableName, TableName newTableName,
-      boolean converted, boolean truncateIfExists) throws IOException {
-    restoreTableAndCreate(tableName, newTableName, tableBackupPath, converted, truncateIfExists);
+      boolean converted, boolean truncateIfExists, String lastIncrBackupId) throws IOException {
+    restoreTableAndCreate(tableName, newTableName, tableBackupPath, converted, truncateIfExists,
+        lastIncrBackupId);
   }
 
   /**
@@ -275,104 +345,126 @@ public class RestoreServerUtil {
     return tableArchivePath;
   }
 
+  private HTableDescriptor getTableDescriptor(FileSystem fileSys, TableName tableName,
+      String lastIncrBackupId) throws IOException {
+    if (lastIncrBackupId != null) {
+      String target = BackupClientUtil.getTableBackupDir(backupRootPath.toString(),
+          lastIncrBackupId, tableName);
+      // Path target = new Path(info.getBackupStatus(tableName).getTargetDir());
+      return FSTableDescriptors.getTableDescriptorFromFs(fileSys,
+          new Path(target)).getHTableDescriptor();
+    }
+    return null;
+  }
+
   private void restoreTableAndCreate(TableName tableName, TableName newTableName,
-      Path tableBackupPath, boolean converted, boolean truncateIfExists) throws IOException {
+      Path tableBackupPath, boolean converted, boolean truncateIfExists, String lastIncrBackupId)
+          throws IOException {
     if (newTableName == null || newTableName.equals("")) {
       newTableName = tableName;
     }
 
     FileSystem fileSys = tableBackupPath.getFileSystem(this.conf);
 
-    // get table descriptor first
-    HTableDescriptor tableDescriptor = null;
-
-    Path tableSnapshotPath = getTableSnapshotPath(backupRootPath, tableName, backupId);
+    HTableDescriptor tableDescriptor = getTableDescriptor(fileSys, tableName, lastIncrBackupId);
+    if (tableDescriptor != null) {
+      LOG.debug("Retrieved descriptor: " + tableDescriptor + " thru " + lastIncrBackupId);
+    }
 
-    if (fileSys.exists(tableSnapshotPath)) {
-      // snapshot path exist means the backup path is in HDFS
-      // check whether snapshot dir already recorded for target table
-      if (snapshotMap.get(tableName) != null) {
-        SnapshotDescription desc =
-            SnapshotDescriptionUtils.readSnapshotInfo(fileSys, tableSnapshotPath);
-        SnapshotManifest manifest = SnapshotManifest.open(conf, fileSys, tableSnapshotPath, desc);
-        tableDescriptor = manifest.getTableDescriptor();
-      } else {
-        tableDescriptor = getTableDesc(tableName);
-        snapshotMap.put(tableName, getTableInfoPath(tableName));
-      }
+    try (Connection conn = ConnectionFactory.createConnection(conf);
+        HBaseAdmin hbadmin = (HBaseAdmin) conn.getAdmin();) {
       if (tableDescriptor == null) {
-        LOG.debug("Found no table descriptor in the snapshot dir, previous schema would be lost");
+        Path tableSnapshotPath = getTableSnapshotPath(backupRootPath, tableName, backupId);
+        if (fileSys.exists(tableSnapshotPath)) {
+          // snapshot path exist means the backup path is in HDFS
+          // check whether snapshot dir already recorded for target table
+          if (snapshotMap.get(tableName) != null) {
+            SnapshotDescription desc =
+                SnapshotDescriptionUtils.readSnapshotInfo(fileSys, tableSnapshotPath);
+            SnapshotManifest manifest = SnapshotManifest.open(conf,fileSys,tableSnapshotPath,desc);
+            tableDescriptor = manifest.getTableDescriptor();
+            LOG.debug("obtained descriptor from " + manifest);
+          } else {
+            tableDescriptor = getTableDesc(tableName);
+            snapshotMap.put(tableName, getTableInfoPath(tableName));
+            LOG.debug("obtained descriptor from snapshot for " + tableName);
+          }
+          if (tableDescriptor == null) {
+            LOG.debug("Found no table descriptor in the snapshot dir, previous schema was lost");
+          }
+        } else if (converted) {
+          // first check if this is a converted backup image
+          LOG.error("convert will be supported in a future jira");
+        }
       }
-    } else if (converted) {
-      // first check if this is a converted backup image
-      LOG.error("convert will be supported in a future jira");
-    }
 
-    Path tableArchivePath = getTableArchivePath(tableName);
-    if (tableArchivePath == null) {
-      if (tableDescriptor != null) {
-        // find table descriptor but no archive dir means the table is empty, create table and exit
-        if(LOG.isDebugEnabled()) {
-          LOG.debug("find table descriptor but no archive dir for table " + tableName
-            + ", will only create table");
+      Path tableArchivePath = getTableArchivePath(tableName);
+      if (tableArchivePath == null) {
+        if (tableDescriptor != null) {
+          // find table descriptor but no archive dir => the table is empty, create table and exit
+          if(LOG.isDebugEnabled()) {
+            LOG.debug("find table descriptor but no archive dir for table " + tableName
+                + ", will only create table");
+          }
+          tableDescriptor.setName(newTableName);
+          checkAndCreateTable(hbadmin, tableBackupPath, tableName, newTableName, null, 
+              tableDescriptor, truncateIfExists);
+          return;
+        } else {
+          throw new IllegalStateException("Cannot restore hbase table because directory '"
+              + " tableArchivePath is null.");
         }
-        tableDescriptor.setName(newTableName);
-        checkAndCreateTable(tableBackupPath, tableName, newTableName, null, 
-          tableDescriptor, truncateIfExists);
-        return;
-      } else {
-        throw new IllegalStateException("Cannot restore hbase table because directory '"
-            + " tableArchivePath is null.");
       }
-    }
 
-    if (tableDescriptor == null) {
-      tableDescriptor = new HTableDescriptor(newTableName);
-    } else {
-      tableDescriptor.setName(newTableName);
-    }
+      if (tableDescriptor == null) {
+        LOG.debug("New descriptor for " + newTableName);
+        tableDescriptor = new HTableDescriptor(newTableName);
+      } else {
+        tableDescriptor.setName(newTableName);
+      }
 
-    if (!converted) {
-      // record all region dirs:
-      // load all files in dir
-      try {
-        ArrayList<Path> regionPathList = getRegionList(tableName);
-
-        // should only try to create the table with all region informations, so we could pre-split
-        // the regions in fine grain
-        checkAndCreateTable(tableBackupPath, tableName, newTableName, regionPathList,
-          tableDescriptor, truncateIfExists);
-        if (tableArchivePath != null) {
-          // start real restore through bulkload
-          // if the backup target is on local cluster, special action needed
-          Path tempTableArchivePath = checkLocalAndBackup(tableArchivePath);
-          if (tempTableArchivePath.equals(tableArchivePath)) {
-            if(LOG.isDebugEnabled()) {
-              LOG.debug("TableArchivePath for bulkload using existPath: " + tableArchivePath);
-            }
-          } else {
-            regionPathList = getRegionList(tempTableArchivePath); // point to the tempDir
-            if(LOG.isDebugEnabled()) {
-              LOG.debug("TableArchivePath for bulkload using tempPath: " + tempTableArchivePath);
+      if (!converted) {
+        // record all region dirs:
+        // load all files in dir
+        try {
+          ArrayList<Path> regionPathList = getRegionList(tableName);
+
+          // should only try to create the table with all region informations, so we could pre-split
+          // the regions in fine grain
+          checkAndCreateTable(hbadmin, tableBackupPath, tableName, newTableName, regionPathList,
+              tableDescriptor, truncateIfExists);
+          if (tableArchivePath != null) {
+            // start real restore through bulkload
+            // if the backup target is on local cluster, special action needed
+            Path tempTableArchivePath = checkLocalAndBackup(tableArchivePath);
+            if (tempTableArchivePath.equals(tableArchivePath)) {
+              if(LOG.isDebugEnabled()) {
+                LOG.debug("TableArchivePath for bulkload using existPath: " + tableArchivePath);
+              }
+            } else {
+              regionPathList = getRegionList(tempTableArchivePath); // point to the tempDir
+              if(LOG.isDebugEnabled()) {
+                LOG.debug("TableArchivePath for bulkload using tempPath: " + tempTableArchivePath);
+              }
             }
-          }
 
-          LoadIncrementalHFiles loader = createLoader(tempTableArchivePath, false);
-          for (Path regionPath : regionPathList) {
-            String regionName = regionPath.toString();
-            if(LOG.isDebugEnabled()) {
-              LOG.debug("Restoring HFiles from directory " + regionName);
+            LoadIncrementalHFiles loader = createLoader(tempTableArchivePath, false);
+            for (Path regionPath : regionPathList) {
+              String regionName = regionPath.toString();
+              if(LOG.isDebugEnabled()) {
+                LOG.debug("Restoring HFiles from directory " + regionName);
+              }
+              String[] args = { regionName, newTableName.getNameAsString() };
+              loader.run(args);
             }
-            String[] args = { regionName, newTableName.getNameAsString()};
-            loader.run(args);
           }
+          // we do not recovered edits
+        } catch (Exception e) {
+          throw new IllegalStateException("Cannot restore hbase table", e);
         }
-        // we do not recovered edits
-      } catch (Exception e) {
-        throw new IllegalStateException("Cannot restore hbase table", e);
+      } else {
+        LOG.debug("convert will be supported in a future jira");
       }
-    } else {
-      LOG.debug("convert will be supported in a future jira");
     }
   }
 
@@ -472,6 +564,7 @@ public class RestoreServerUtil {
     // By default, it is 32 and loader will fail if # of files in any region exceed this
     // limit. Bad for snapshot restore.
     this.conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE);
+    this.conf.set(LoadIncrementalHFiles.IGNORE_UNMATCHED_CF_CONF_KEY, "yes");
     LoadIncrementalHFiles loader = null;
     try {
       loader = new LoadIncrementalHFiles(this.conf);
@@ -571,15 +664,11 @@ public class RestoreServerUtil {
    * @param htd table descriptor
    * @throws IOException exception
    */
-  private void checkAndCreateTable(Path tableBackupPath, TableName tableName,
+  private void checkAndCreateTable(HBaseAdmin hbadmin, Path tableBackupPath, TableName tableName,
       TableName targetTableName, ArrayList<Path> regionDirList, 
       HTableDescriptor htd, boolean truncateIfExists)
           throws IOException {
-    HBaseAdmin hbadmin = null;
-    Connection conn = null;
     try {
-      conn = ConnectionFactory.createConnection(conf);
-      hbadmin = (HBaseAdmin) conn.getAdmin();
       boolean createNew = false;
       if (hbadmin.tableExists(targetTableName)) {
         if(truncateIfExists) {
@@ -592,8 +681,8 @@ public class RestoreServerUtil {
         }
       } else {
         createNew = true;
-      }      
-      if(createNew){
+      }
+      if (createNew){
         LOG.info("Creating target table '" + targetTableName + "'");
         // if no region directory given, create the table and return
         if (regionDirList == null || regionDirList.size() == 0) {
@@ -614,13 +703,6 @@ public class RestoreServerUtil {
       }
     } catch (Exception e) {
       throw new IOException(e);
-    } finally {
-      if (hbadmin != null) {
-        hbadmin.close();
-      }
-      if(conn != null){
-        conn.close();
-      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/28737d05/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index 5d75d56..f7a5378 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -117,9 +117,11 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
     = "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily";
   private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
   public final static String CREATE_TABLE_CONF_KEY = "create.table";
+  public final static String IGNORE_UNMATCHED_CF_CONF_KEY = "ignore.unmatched.families";
 
   private int maxFilesPerRegionPerFamily;
   private boolean assignSeqIds;
+  private Set<String> unmatchedFamilies = new HashSet<String>();
 
   // Source filesystem
   private FileSystem fs;
@@ -157,7 +159,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
   private void usage() {
     System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output tablename" + "\n -D"
         + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by this tool\n"
-        + "  Note: if you set this to 'no', then the target table must already exist in HBase\n"
+        + "  Note: if you set this to 'no', then the target table must already exist in HBase\n -D"
+        + IGNORE_UNMATCHED_CF_CONF_KEY + "=yes - can be used to ignore unmatched column families\n"
         + "\n");
   }
 
@@ -305,7 +308,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
       throws TableNotFoundException, IOException {
     try (Admin admin = table.getConnection().getAdmin();
         RegionLocator rl = table.getRegionLocator()) {
-      doBulkLoad(hfofDir, admin, table, rl);
+      doBulkLoad(hfofDir, admin, table, rl, false);
     }
   }
 
@@ -315,11 +318,30 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
    *
    * @param hfofDir the directory that was provided as the output path
    * of a job using HFileOutputFormat
+   * @param admin the Admin
    * @param table the table to load into
+   * @param regionLocator region locator
    * @throws TableNotFoundException if table does not yet exist
    */
   public void doBulkLoad(Path hfofDir, final Admin admin, Table table,
       RegionLocator regionLocator) throws TableNotFoundException, IOException  {
+    doBulkLoad(hfofDir, admin, table, regionLocator, false);
+  }
+
+  /**
+   * Perform a bulk load of the given directory into the given
+   * pre-existing table.  This method is not threadsafe.
+   *
+   * @param hfofDir the directory that was provided as the output path
+   * of a job using HFileOutputFormat
+   * @param admin the Admin
+   * @param table the table to load into
+   * @param regionLocator region locator
+   * @param ignoreUnmatchedCF true to ignore unmatched column families
+   * @throws TableNotFoundException if table does not yet exist
+   */
+  public void doBulkLoad(Path hfofDir, final Admin admin, Table table,
+      RegionLocator regionLocator, boolean ignoreUnmatchedCF) throws TableNotFoundException, IOException  {
 
     if (!admin.isTableAvailable(regionLocator.getName())) {
       throw new TableNotFoundException("Table " + table.getName() + " is not currently available.");
@@ -342,7 +364,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
 	    "option, consider removing the files and bulkload again without this option. " +
 	    "See HBASE-13985");
       }
-      prepareHFileQueue(hfofDir, table, queue, validateHFile);
+      prepareHFileQueue(hfofDir, table, queue, validateHFile, ignoreUnmatchedCF);
 
       int count = 0;
 
@@ -431,8 +453,24 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
    */
   public void prepareHFileQueue(Path hfilesDir, Table table, Deque<LoadQueueItem> queue,
       boolean validateHFile) throws IOException {
+    prepareHFileQueue(hfilesDir, table, queue, validateHFile, false);
+  }
+
+  /**
+   * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the
+   * passed directory and validates whether the prepared queue has all the valid table column
+   * families in it.
+   * @param hfilesDir directory containing list of hfiles to be loaded into the table
+   * @param table table to which hfiles should be loaded
+   * @param queue queue which needs to be loaded into the table
+   * @param validateHFile if true hfiles will be validated for its format
+   * @param ignoreUnmatchedCF true to ignore unmatched column families
+   * @throws IOException If any I/O or network error occurred
+   */
+  public void prepareHFileQueue(Path hfilesDir, Table table, Deque<LoadQueueItem> queue,
+      boolean validateHFile, boolean ignoreUnmatchedCF) throws IOException {
     discoverLoadQueue(queue, hfilesDir, validateHFile);
-    validateFamiliesInHFiles(table, queue);
+    validateFamiliesInHFiles(table, queue, ignoreUnmatchedCF);
   }
 
   // Initialize a thread pool
@@ -448,14 +486,13 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
   /**
    * Checks whether there is any invalid family name in HFiles to be bulk loaded.
    */
-  private void validateFamiliesInHFiles(Table table, Deque<LoadQueueItem> queue)
+  private void validateFamiliesInHFiles(Table table, Deque<LoadQueueItem> queue, boolean silence)
       throws IOException {
     Collection<HColumnDescriptor> families = table.getTableDescriptor().getFamilies();
     List<String> familyNames = new ArrayList<String>(families.size());
     for (HColumnDescriptor family : families) {
       familyNames.add(family.getNameAsString());
     }
-    List<String> unmatchedFamilies = new ArrayList<String>();
     Iterator<LoadQueueItem> queueIter = queue.iterator();
     while (queueIter.hasNext()) {
       LoadQueueItem lqi = queueIter.next();
@@ -470,7 +507,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
               + unmatchedFamilies + "; valid family names of table " + table.getName() + " are: "
               + familyNames;
       LOG.error(msg);
-      throw new IOException(msg);
+      if (!silence) throw new IOException(msg);
     }
   }
 
@@ -774,7 +811,9 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
     final List<Pair<byte[], String>> famPaths =
       new ArrayList<Pair<byte[], String>>(lqis.size());
     for (LoadQueueItem lqi : lqis) {
-      famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString()));
+      if (!unmatchedFamilies.contains(Bytes.toString(lqi.family))) {
+        famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString()));
+      }
     }
 
     final RegionServerCallable<Boolean> svrCallable =
@@ -1028,7 +1067,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
 
   @Override
   public int run(String[] args) throws Exception {
-    if (args.length != 2) {
+    if (args.length < 2) {
       usage();
       return -1;
     }
@@ -1054,7 +1093,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
 
       try (Table table = connection.getTable(tableName);
           RegionLocator locator = connection.getRegionLocator(tableName)) {
-          doBulkLoad(hfofDir, admin, table, locator);
+          boolean silence = "yes".equalsIgnoreCase(getConf().get(IGNORE_UNMATCHED_CF_CONF_KEY, ""));
+          doBulkLoad(hfofDir, admin, table, locator, silence);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/28737d05/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
index 0db96be..d617b52 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
@@ -120,7 +120,7 @@ public class TestBackupBase {
   
   public static void waitForSystemTable() throws Exception
   {
-    try(Admin admin = TEST_UTIL.getAdmin();) {
+    try (Admin admin = TEST_UTIL.getAdmin();) {
       while (!admin.tableExists(BackupSystemTable.getTableName()) 
           || !admin.isTableAvailable(BackupSystemTable.getTableName())) {
         Thread.sleep(1000);
@@ -142,6 +142,18 @@ public class TestBackupBase {
     TEST_UTIL.shutdownMiniMapReduceCluster();
   }
 
+  HTable insertIntoTable(Connection conn, TableName table, byte[] family, int id, int numRows)
+      throws IOException {
+    HTable t = (HTable) conn.getTable(table);
+    Put p1;
+    for (int i = 0; i < numRows; i++) {
+      p1 = new Put(Bytes.toBytes("row-" + table + "-" + id + "-"+ i));
+      p1.addColumn(family, qualName, Bytes.toBytes("val" + i));
+      t.put(p1);
+    }
+    return t;
+  }
+
   protected String backupTables(BackupType type, List<TableName> tables, String path)
       throws IOException {
     Connection conn = null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/28737d05/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java
index 9a2ad89..ae37b4c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java
@@ -20,12 +20,15 @@ package org.apache.hadoop.hbase.backup;
 
 import static org.junit.Assert.assertTrue;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.BackupAdmin;
 import org.apache.hadoop.hbase.client.Connection;
@@ -66,8 +69,15 @@ public class TestIncrementalBackup extends TestBackupBase {
     LOG.info("create full backup image for all tables");
 
     List<TableName> tables = Lists.newArrayList(table1, table2);
-    HBaseAdmin admin = null;
+    final byte[] fam3Name = Bytes.toBytes("f3");
+    table1Desc.addFamily(new HColumnDescriptor(fam3Name));
+    HBaseTestingUtility.modifyTableSync(TEST_UTIL.getAdmin(), table1Desc);
+
     Connection conn = ConnectionFactory.createConnection(conf1);
+    int NB_ROWS_FAM3 = 6;
+    insertIntoTable(conn, table1, fam3Name, 3, NB_ROWS_FAM3).close();
+
+    HBaseAdmin admin = null;
     admin = (HBaseAdmin) conn.getAdmin();
 
     BackupRequest request = new BackupRequest();
@@ -77,16 +87,11 @@ public class TestIncrementalBackup extends TestBackupBase {
     assertTrue(checkSucceeded(backupIdFull));
 
     // #2 - insert some data to table
+    HTable t1 = insertIntoTable(conn, table1, famName, 1, NB_ROWS_IN_BATCH);
     LOG.debug("writing " + NB_ROWS_IN_BATCH + " rows to " + table1);
-    HTable t1 = (HTable) conn.getTable(table1);
-    Put p1;
-    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
-      p1 = new Put(Bytes.toBytes("row-t1" + i));
-      p1.addColumn(famName, qualName, Bytes.toBytes("val" + i));
-      t1.put(p1);
-    }
 
-    Assert.assertThat(TEST_UTIL.countRows(t1), CoreMatchers.equalTo(NB_ROWS_IN_BATCH * 2));
+    Assert.assertThat(TEST_UTIL.countRows(t1), CoreMatchers.equalTo(
+        NB_ROWS_IN_BATCH * 2 + NB_ROWS_FAM3));
     t1.close();
     LOG.debug("written " + NB_ROWS_IN_BATCH + " rows to " + table1);
 
@@ -110,6 +115,24 @@ public class TestIncrementalBackup extends TestBackupBase {
     String backupIdIncMultiple = admin.getBackupAdmin().backupTables(request);
     assertTrue(checkSucceeded(backupIdIncMultiple));
 
+    // add column family f2 to table1
+    final byte[] fam2Name = Bytes.toBytes("f2");
+    table1Desc.addFamily(new HColumnDescriptor(fam2Name));
+    // drop column family f3
+    table1Desc.removeFamily(fam3Name);
+    HBaseTestingUtility.modifyTableSync(TEST_UTIL.getAdmin(), table1Desc);
+
+    int NB_ROWS_FAM2 = 7;
+    HTable t3 = insertIntoTable(conn, table1, fam2Name, 2, NB_ROWS_FAM2);
+    t3.close();
+
+    // #3 - incremental backup for multiple tables
+    request = new BackupRequest();
+    request.setBackupType(BackupType.INCREMENTAL).setTableList(tables)
+    .setTargetRootDir(BACKUP_ROOT_DIR);
+    String backupIdIncMultiple2 = admin.getBackupAdmin().backupTables(request);
+    assertTrue(checkSucceeded(backupIdIncMultiple2));
+
     // #4 - restore full backup for all tables, without overwrite
     TableName[] tablesRestoreFull =
         new TableName[] { table1, table2 };
@@ -118,6 +141,7 @@ public class TestIncrementalBackup extends TestBackupBase {
         new TableName[] { table1_restore, table2_restore };
 
     BackupAdmin client = getBackupAdmin();
+    LOG.debug("Restoring full " + backupIdFull);
     client.restore(createRestoreRequest(BACKUP_ROOT_DIR, backupIdFull, false,
       tablesRestoreFull,
       tablesMapFull, false));
@@ -131,7 +155,8 @@ public class TestIncrementalBackup extends TestBackupBase {
 
     // #5.2 - checking row count of tables for full restore
     HTable hTable = (HTable) conn.getTable(table1_restore);
-    Assert.assertThat(TEST_UTIL.countRows(hTable), CoreMatchers.equalTo(NB_ROWS_IN_BATCH));
+    Assert.assertThat(TEST_UTIL.countRows(hTable), CoreMatchers.equalTo(NB_ROWS_IN_BATCH +
+        NB_ROWS_FAM3));
     hTable.close();
 
     hTable = (HTable) conn.getTable(table2_restore);
@@ -143,11 +168,16 @@ public class TestIncrementalBackup extends TestBackupBase {
         new TableName[] { table1, table2 };
     TableName[] tablesMapIncMultiple =
         new TableName[] { table1_restore, table2_restore };
-    client.restore(createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple, false,
+    client.restore(createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple2, false,
       tablesRestoreIncMultiple, tablesMapIncMultiple, true));
 
     hTable = (HTable) conn.getTable(table1_restore);
-    Assert.assertThat(TEST_UTIL.countRows(hTable), CoreMatchers.equalTo(NB_ROWS_IN_BATCH * 2));
+    LOG.debug("After incremental restore: " + hTable.getTableDescriptor());
+    LOG.debug("f1 has " + TEST_UTIL.countRows(hTable, famName) + " rows");
+    Assert.assertThat(TEST_UTIL.countRows(hTable, famName),
+        CoreMatchers.equalTo(NB_ROWS_IN_BATCH * 2));
+    LOG.debug("f2 has " + TEST_UTIL.countRows(hTable, fam2Name) + " rows");
+    Assert.assertThat(TEST_UTIL.countRows(hTable, fam2Name), CoreMatchers.equalTo(NB_ROWS_FAM2));
     hTable.close();
 
     hTable = (HTable) conn.getTable(table2_restore);