You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2016/09/06 15:49:28 UTC
hbase git commit: HBASE-15449 HBase Backup Phase 3: Support physical
table layout change
Repository: hbase
Updated Branches:
refs/heads/HBASE-7912 6d48260c8 -> 28737d05f
HBASE-15449 HBase Backup Phase 3: Support physical table layout change
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/28737d05
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/28737d05
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/28737d05
Branch: refs/heads/HBASE-7912
Commit: 28737d05f4c336253dd161743accdad31f30efcc
Parents: 6d48260
Author: tedyu <yu...@gmail.com>
Authored: Tue Sep 6 08:48:58 2016 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Tue Sep 6 08:48:58 2016 -0700
----------------------------------------------------------------------
.../hbase/backup/impl/RestoreClientImpl.java | 29 +-
.../hbase/backup/util/BackupServerUtil.java | 3 +-
.../hbase/backup/util/RestoreServerUtil.java | 274 ++++++++++++-------
.../hbase/mapreduce/LoadIncrementalHFiles.java | 60 +++-
.../hadoop/hbase/backup/TestBackupBase.java | 14 +-
.../hbase/backup/TestIncrementalBackup.java | 54 +++-
6 files changed, 302 insertions(+), 132 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/28737d05/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreClientImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreClientImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreClientImpl.java
index a04fc08..7f23ce0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreClientImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreClientImpl.java
@@ -261,35 +261,40 @@ public final class RestoreClientImpl implements RestoreClient {
// TODO: convert feature will be provided in a future JIRA
boolean converted = false;
+ String lastIncrBackupId = null;
+ List<String> logDirList = null;
+ // Scan incremental backups
+ if (it.hasNext()) {
+ // obtain the backupId for most recent incremental
+ logDirList = new ArrayList<String>();
+ while (it.hasNext()) {
+ BackupImage im = it.next();
+ String logBackupDir = HBackupFileSystem.getLogBackupDir(im.getRootDir(), im.getBackupId());
+ logDirList.add(logBackupDir);
+ lastIncrBackupId = im.getBackupId();
+ }
+ }
if (manifest.getType() == BackupType.FULL || converted) {
LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from "
+ (converted ? "converted" : "full") + " backup image " + tableBackupPath.toString());
restoreTool.fullRestoreTable(tableBackupPath, sTable, tTable,
- converted, truncateIfExists);
-
+ converted, truncateIfExists, lastIncrBackupId);
} else { // incremental Backup
throw new IOException("Unexpected backup type " + image.getType());
}
// The rest one are incremental
- if (it.hasNext()) {
- List<String> logDirList = new ArrayList<String>();
- while (it.hasNext()) {
- BackupImage im = it.next();
- String logBackupDir = HBackupFileSystem.getLogBackupDir(im.getRootDir(), im.getBackupId());
- logDirList.add(logBackupDir);
- }
+ if (logDirList != null) {
String logDirs = StringUtils.join(logDirList, ",");
LOG.info("Restoring '" + sTable + "' to '" + tTable
+ "' from log dirs: " + logDirs);
String[] sarr = new String[logDirList.size()];
logDirList.toArray(sarr);
Path[] paths = org.apache.hadoop.util.StringUtils.stringToPath(sarr);
- restoreTool.incrementalRestoreTable(paths, new TableName[] { sTable },
- new TableName[] { tTable });
+ restoreTool.incrementalRestoreTable(tableBackupPath, paths, new TableName[] { sTable },
+ new TableName[] { tTable }, lastIncrBackupId);
}
LOG.info(sTable + " has been successfully restored to " + tTable);
}
-
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/28737d05/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupServerUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupServerUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupServerUtil.java
index 265ef6c..37c8d65 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupServerUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupServerUtil.java
@@ -172,7 +172,6 @@ public final class BackupServerUtil {
LOG.warn("Table "+ table+" does not exists, skipping it.");
continue;
}
- LOG.debug("Attempting to copy table info for:" + table);
TableDescriptor orig = FSTableDescriptors.getTableDescriptorFromFs(fs, rootDir, table);
// write a copy of descriptor to the target directory
@@ -181,6 +180,8 @@ public final class BackupServerUtil {
FSTableDescriptors descriptors =
new FSTableDescriptors(conf, targetFs, FSUtils.getRootDir(conf));
descriptors.createTableDescriptorForTableDirectory(target, orig, false);
+ LOG.debug("Attempting to copy table info for:" + table + " target: " + target +
+ " descriptor: " + orig);
LOG.debug("Finished copying tableinfo.");
List<HRegionInfo> regions = null;
regions = admin.getTableRegions(table);
http://git-wip-us.apache.org/repos/asf/hbase/blob/28737d05/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java
index 6f83f25..620d622 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java
@@ -20,8 +20,11 @@ package org.apache.hadoop.hbase.backup.util;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
@@ -31,9 +34,11 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupInfo;
import org.apache.hadoop.hbase.backup.BackupRestoreServerFactory;
import org.apache.hadoop.hbase.backup.HBackupFileSystem;
import org.apache.hadoop.hbase.backup.IncrementalRestoreService;
@@ -55,6 +60,8 @@ import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSTableDescriptors;
+import org.apache.hadoop.hbase.util.Pair;
/**
* A collection for methods used by multiple classes to restore HBase tables.
@@ -133,21 +140,54 @@ public class RestoreServerUtil {
return regionDirList;
}
+ static void modifyTableSync(Admin admin, HTableDescriptor desc)
+ throws IOException {
+ admin.modifyTable(desc.getTableName(), desc);
+ Pair<Integer, Integer> status = new Pair<Integer, Integer>() {{
+ setFirst(0);
+ setSecond(0);
+ }};
+ int i = 0;
+ do {
+ status = admin.getAlterStatus(desc.getTableName());
+ if (status.getSecond() != 0) {
+ LOG.debug(status.getSecond() - status.getFirst() + "/" + status.getSecond()
+ + " regions updated.");
+ try {
+ Thread.sleep(1 * 1000l);
+ } catch (InterruptedException ie) {
+ InterruptedIOException iie = new InterruptedIOException();
+ iie.initCause(ie);
+ throw iie;
+ }
+ } else {
+ LOG.debug("All regions updated.");
+ break;
+ }
+ } while (status.getFirst() != 0 && i++ < 500);
+ if (status.getFirst() != 0) {
+ throw new IOException("Failed to update all regions even after 500 seconds.");
+ }
+ }
+
/**
* During incremental backup operation. Call WalPlayer to replay WAL in backup image Currently
* tableNames and newTablesNames only contain single table, will be expanded to multiple tables in
* the future
- * @param logDir : incremental backup folders, which contains WAL
+ * @param tableBackupPath backup path
+ * @param logDirs : incremental backup folders, which contains WAL
* @param tableNames : source tableNames(table names were backuped)
* @param newTableNames : target tableNames(table names to be restored to)
+ * @param incrBackupId incremental backup Id
* @throws IOException exception
*/
- public void incrementalRestoreTable(Path[] logDirs,
- TableName[] tableNames, TableName[] newTableNames) throws IOException {
+ public void incrementalRestoreTable(Path tableBackupPath, Path[] logDirs, TableName[] tableNames,
+ TableName[] newTableNames, String incrBackupId) throws IOException {
if (tableNames.length != newTableNames.length) {
throw new IOException("Number of source tables and target tables does not match!");
}
+ FileSystem fileSys = tableBackupPath.getFileSystem(this.conf);
// for incremental backup image, expect the table already created either by user or previous
// full backup. Here, check that all new tables exists
@@ -160,6 +200,35 @@ public class RestoreServerUtil {
+ " does not exist. Create the table first, e.g. by restoring a full backup.");
}
}
+ // adjust table schema
+ for (int i = 0; i < tableNames.length; i++) {
+ TableName tableName = tableNames[i];
+ HTableDescriptor tableDescriptor = getTableDescriptor(fileSys, tableName, incrBackupId);
+ LOG.debug("Found descriptor " + tableDescriptor + " through " + incrBackupId);
+
+ TableName newTableName = newTableNames[i];
+ HTableDescriptor newTableDescriptor = admin.getTableDescriptor(newTableName);
+ List<HColumnDescriptor> families = Arrays.asList(tableDescriptor.getColumnFamilies());
+ List<HColumnDescriptor> existingFamilies =
+ Arrays.asList(newTableDescriptor.getColumnFamilies());
+ boolean schemaChangeNeeded = false;
+ for (HColumnDescriptor family : families) {
+ if (!existingFamilies.contains(family)) {
+ newTableDescriptor.addFamily(family);
+ schemaChangeNeeded = true;
+ }
+ }
+ for (HColumnDescriptor family : existingFamilies) {
+ if (!families.contains(family)) {
+ newTableDescriptor.removeFamily(family.getName());
+ schemaChangeNeeded = true;
+ }
+ }
+ if (schemaChangeNeeded) {
+ RestoreServerUtil.modifyTableSync(admin, newTableDescriptor);
+ LOG.info("Changed " + newTableDescriptor.getTableName() + " to: " + newTableDescriptor);
+ }
+ }
IncrementalRestoreService restoreService =
BackupRestoreServerFactory.getIncrementalRestoreService(conf);
@@ -168,8 +237,9 @@ public class RestoreServerUtil {
}
public void fullRestoreTable(Path tableBackupPath, TableName tableName, TableName newTableName,
- boolean converted, boolean truncateIfExists) throws IOException {
- restoreTableAndCreate(tableName, newTableName, tableBackupPath, converted, truncateIfExists);
+ boolean converted, boolean truncateIfExists, String lastIncrBackupId) throws IOException {
+ restoreTableAndCreate(tableName, newTableName, tableBackupPath, converted, truncateIfExists,
+ lastIncrBackupId);
}
/**
@@ -275,104 +345,126 @@ public class RestoreServerUtil {
return tableArchivePath;
}
+ private HTableDescriptor getTableDescriptor(FileSystem fileSys, TableName tableName,
+ String lastIncrBackupId) throws IOException {
+ if (lastIncrBackupId != null) {
+ String target = BackupClientUtil.getTableBackupDir(backupRootPath.toString(),
+ lastIncrBackupId, tableName);
+ // Path target = new Path(info.getBackupStatus(tableName).getTargetDir());
+ return FSTableDescriptors.getTableDescriptorFromFs(fileSys,
+ new Path(target)).getHTableDescriptor();
+ }
+ return null;
+ }
+
private void restoreTableAndCreate(TableName tableName, TableName newTableName,
- Path tableBackupPath, boolean converted, boolean truncateIfExists) throws IOException {
+ Path tableBackupPath, boolean converted, boolean truncateIfExists, String lastIncrBackupId)
+ throws IOException {
if (newTableName == null || newTableName.equals("")) {
newTableName = tableName;
}
FileSystem fileSys = tableBackupPath.getFileSystem(this.conf);
- // get table descriptor first
- HTableDescriptor tableDescriptor = null;
-
- Path tableSnapshotPath = getTableSnapshotPath(backupRootPath, tableName, backupId);
+ HTableDescriptor tableDescriptor = getTableDescriptor(fileSys, tableName, lastIncrBackupId);
+ if (tableDescriptor != null) {
+ LOG.debug("Retrieved descriptor: " + tableDescriptor + " thru " + lastIncrBackupId);
+ }
- if (fileSys.exists(tableSnapshotPath)) {
- // snapshot path exist means the backup path is in HDFS
- // check whether snapshot dir already recorded for target table
- if (snapshotMap.get(tableName) != null) {
- SnapshotDescription desc =
- SnapshotDescriptionUtils.readSnapshotInfo(fileSys, tableSnapshotPath);
- SnapshotManifest manifest = SnapshotManifest.open(conf, fileSys, tableSnapshotPath, desc);
- tableDescriptor = manifest.getTableDescriptor();
- } else {
- tableDescriptor = getTableDesc(tableName);
- snapshotMap.put(tableName, getTableInfoPath(tableName));
- }
+ try (Connection conn = ConnectionFactory.createConnection(conf);
+ HBaseAdmin hbadmin = (HBaseAdmin) conn.getAdmin();) {
if (tableDescriptor == null) {
- LOG.debug("Found no table descriptor in the snapshot dir, previous schema would be lost");
+ Path tableSnapshotPath = getTableSnapshotPath(backupRootPath, tableName, backupId);
+ if (fileSys.exists(tableSnapshotPath)) {
+ // snapshot path exist means the backup path is in HDFS
+ // check whether snapshot dir already recorded for target table
+ if (snapshotMap.get(tableName) != null) {
+ SnapshotDescription desc =
+ SnapshotDescriptionUtils.readSnapshotInfo(fileSys, tableSnapshotPath);
+ SnapshotManifest manifest = SnapshotManifest.open(conf,fileSys,tableSnapshotPath,desc);
+ tableDescriptor = manifest.getTableDescriptor();
+ LOG.debug("obtained descriptor from " + manifest);
+ } else {
+ tableDescriptor = getTableDesc(tableName);
+ snapshotMap.put(tableName, getTableInfoPath(tableName));
+ LOG.debug("obtained descriptor from snapshot for " + tableName);
+ }
+ if (tableDescriptor == null) {
+ LOG.debug("Found no table descriptor in the snapshot dir, previous schema was lost");
+ }
+ } else if (converted) {
+ // first check if this is a converted backup image
+ LOG.error("convert will be supported in a future jira");
+ }
}
- } else if (converted) {
- // first check if this is a converted backup image
- LOG.error("convert will be supported in a future jira");
- }
- Path tableArchivePath = getTableArchivePath(tableName);
- if (tableArchivePath == null) {
- if (tableDescriptor != null) {
- // find table descriptor but no archive dir means the table is empty, create table and exit
- if(LOG.isDebugEnabled()) {
- LOG.debug("find table descriptor but no archive dir for table " + tableName
- + ", will only create table");
+ Path tableArchivePath = getTableArchivePath(tableName);
+ if (tableArchivePath == null) {
+ if (tableDescriptor != null) {
+ // find table descriptor but no archive dir => the table is empty, create table and exit
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("find table descriptor but no archive dir for table " + tableName
+ + ", will only create table");
+ }
+ tableDescriptor.setName(newTableName);
+ checkAndCreateTable(hbadmin, tableBackupPath, tableName, newTableName, null,
+ tableDescriptor, truncateIfExists);
+ return;
+ } else {
+ throw new IllegalStateException("Cannot restore hbase table because directory '"
+ + " tableArchivePath is null.");
}
- tableDescriptor.setName(newTableName);
- checkAndCreateTable(tableBackupPath, tableName, newTableName, null,
- tableDescriptor, truncateIfExists);
- return;
- } else {
- throw new IllegalStateException("Cannot restore hbase table because directory '"
- + " tableArchivePath is null.");
}
- }
- if (tableDescriptor == null) {
- tableDescriptor = new HTableDescriptor(newTableName);
- } else {
- tableDescriptor.setName(newTableName);
- }
+ if (tableDescriptor == null) {
+ LOG.debug("New descriptor for " + newTableName);
+ tableDescriptor = new HTableDescriptor(newTableName);
+ } else {
+ tableDescriptor.setName(newTableName);
+ }
- if (!converted) {
- // record all region dirs:
- // load all files in dir
- try {
- ArrayList<Path> regionPathList = getRegionList(tableName);
-
- // should only try to create the table with all region informations, so we could pre-split
- // the regions in fine grain
- checkAndCreateTable(tableBackupPath, tableName, newTableName, regionPathList,
- tableDescriptor, truncateIfExists);
- if (tableArchivePath != null) {
- // start real restore through bulkload
- // if the backup target is on local cluster, special action needed
- Path tempTableArchivePath = checkLocalAndBackup(tableArchivePath);
- if (tempTableArchivePath.equals(tableArchivePath)) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("TableArchivePath for bulkload using existPath: " + tableArchivePath);
- }
- } else {
- regionPathList = getRegionList(tempTableArchivePath); // point to the tempDir
- if(LOG.isDebugEnabled()) {
- LOG.debug("TableArchivePath for bulkload using tempPath: " + tempTableArchivePath);
+ if (!converted) {
+ // record all region dirs:
+ // load all files in dir
+ try {
+ ArrayList<Path> regionPathList = getRegionList(tableName);
+
+ // should only try to create the table with all region informations, so we could pre-split
+ // the regions in fine grain
+ checkAndCreateTable(hbadmin, tableBackupPath, tableName, newTableName, regionPathList,
+ tableDescriptor, truncateIfExists);
+ if (tableArchivePath != null) {
+ // start real restore through bulkload
+ // if the backup target is on local cluster, special action needed
+ Path tempTableArchivePath = checkLocalAndBackup(tableArchivePath);
+ if (tempTableArchivePath.equals(tableArchivePath)) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("TableArchivePath for bulkload using existPath: " + tableArchivePath);
+ }
+ } else {
+ regionPathList = getRegionList(tempTableArchivePath); // point to the tempDir
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("TableArchivePath for bulkload using tempPath: " + tempTableArchivePath);
+ }
}
- }
- LoadIncrementalHFiles loader = createLoader(tempTableArchivePath, false);
- for (Path regionPath : regionPathList) {
- String regionName = regionPath.toString();
- if(LOG.isDebugEnabled()) {
- LOG.debug("Restoring HFiles from directory " + regionName);
+ LoadIncrementalHFiles loader = createLoader(tempTableArchivePath, false);
+ for (Path regionPath : regionPathList) {
+ String regionName = regionPath.toString();
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Restoring HFiles from directory " + regionName);
+ }
+ String[] args = { regionName, newTableName.getNameAsString() };
+ loader.run(args);
}
- String[] args = { regionName, newTableName.getNameAsString()};
- loader.run(args);
}
+ // we do not recovered edits
+ } catch (Exception e) {
+ throw new IllegalStateException("Cannot restore hbase table", e);
}
- // we do not recovered edits
- } catch (Exception e) {
- throw new IllegalStateException("Cannot restore hbase table", e);
+ } else {
+ LOG.debug("convert will be supported in a future jira");
}
- } else {
- LOG.debug("convert will be supported in a future jira");
}
}
@@ -472,6 +564,7 @@ public class RestoreServerUtil {
// By default, it is 32 and loader will fail if # of files in any region exceed this
// limit. Bad for snapshot restore.
this.conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE);
+ this.conf.set(LoadIncrementalHFiles.IGNORE_UNMATCHED_CF_CONF_KEY, "yes");
LoadIncrementalHFiles loader = null;
try {
loader = new LoadIncrementalHFiles(this.conf);
@@ -571,15 +664,11 @@ public class RestoreServerUtil {
* @param htd table descriptor
* @throws IOException exception
*/
- private void checkAndCreateTable(Path tableBackupPath, TableName tableName,
+ private void checkAndCreateTable(HBaseAdmin hbadmin, Path tableBackupPath, TableName tableName,
TableName targetTableName, ArrayList<Path> regionDirList,
HTableDescriptor htd, boolean truncateIfExists)
throws IOException {
- HBaseAdmin hbadmin = null;
- Connection conn = null;
try {
- conn = ConnectionFactory.createConnection(conf);
- hbadmin = (HBaseAdmin) conn.getAdmin();
boolean createNew = false;
if (hbadmin.tableExists(targetTableName)) {
if(truncateIfExists) {
@@ -592,8 +681,8 @@ public class RestoreServerUtil {
}
} else {
createNew = true;
- }
- if(createNew){
+ }
+ if (createNew){
LOG.info("Creating target table '" + targetTableName + "'");
// if no region directory given, create the table and return
if (regionDirList == null || regionDirList.size() == 0) {
@@ -614,13 +703,6 @@ public class RestoreServerUtil {
}
} catch (Exception e) {
throw new IOException(e);
- } finally {
- if (hbadmin != null) {
- hbadmin.close();
- }
- if(conn != null){
- conn.close();
- }
}
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/28737d05/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index 5d75d56..f7a5378 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -117,9 +117,11 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
= "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily";
private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
public final static String CREATE_TABLE_CONF_KEY = "create.table";
+ public final static String IGNORE_UNMATCHED_CF_CONF_KEY = "ignore.unmatched.families";
private int maxFilesPerRegionPerFamily;
private boolean assignSeqIds;
+ private Set<String> unmatchedFamilies = new HashSet<String>();
// Source filesystem
private FileSystem fs;
@@ -157,7 +159,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
private void usage() {
System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output tablename" + "\n -D"
+ CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by this tool\n"
- + " Note: if you set this to 'no', then the target table must already exist in HBase\n"
+ + " Note: if you set this to 'no', then the target table must already exist in HBase\n -D"
+ + IGNORE_UNMATCHED_CF_CONF_KEY + "=yes - can be used to ignore unmatched column families\n"
+ "\n");
}
@@ -305,7 +308,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
throws TableNotFoundException, IOException {
try (Admin admin = table.getConnection().getAdmin();
RegionLocator rl = table.getRegionLocator()) {
- doBulkLoad(hfofDir, admin, table, rl);
+ doBulkLoad(hfofDir, admin, table, rl, false);
}
}
@@ -315,11 +318,30 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
*
* @param hfofDir the directory that was provided as the output path
* of a job using HFileOutputFormat
+ * @param admin the Admin
* @param table the table to load into
+ * @param regionLocator region locator
* @throws TableNotFoundException if table does not yet exist
*/
public void doBulkLoad(Path hfofDir, final Admin admin, Table table,
RegionLocator regionLocator) throws TableNotFoundException, IOException {
+ doBulkLoad(hfofDir, admin, table, regionLocator, false);
+ }
+
+ /**
+ * Perform a bulk load of the given directory into the given
+ * pre-existing table. This method is not threadsafe.
+ *
+ * @param hfofDir the directory that was provided as the output path
+ * of a job using HFileOutputFormat
+ * @param admin the Admin
+ * @param table the table to load into
+ * @param regionLocator region locator
+ * @param ignoreUnmatchedCF true to ignore unmatched column families
+ * @throws TableNotFoundException if table does not yet exist
+ */
+ public void doBulkLoad(Path hfofDir, final Admin admin, Table table,
+ RegionLocator regionLocator, boolean ignoreUnmatchedCF) throws TableNotFoundException, IOException {
if (!admin.isTableAvailable(regionLocator.getName())) {
throw new TableNotFoundException("Table " + table.getName() + " is not currently available.");
@@ -342,7 +364,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
"option, consider removing the files and bulkload again without this option. " +
"See HBASE-13985");
}
- prepareHFileQueue(hfofDir, table, queue, validateHFile);
+ prepareHFileQueue(hfofDir, table, queue, validateHFile, ignoreUnmatchedCF);
int count = 0;
@@ -431,8 +453,24 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
*/
public void prepareHFileQueue(Path hfilesDir, Table table, Deque<LoadQueueItem> queue,
boolean validateHFile) throws IOException {
+ prepareHFileQueue(hfilesDir, table, queue, validateHFile, false);
+ }
+
+ /**
+ * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the
+ * passed directory and validates whether the prepared queue has all the valid table column
+ * families in it.
+ * @param hfilesDir directory containing list of hfiles to be loaded into the table
+ * @param table table to which hfiles should be loaded
+ * @param queue queue which needs to be loaded into the table
+ * @param validateHFile if true hfiles will be validated for its format
+ * @param ignoreUnmatchedCF true to ignore unmatched column families
+ * @throws IOException If any I/O or network error occurred
+ */
+ public void prepareHFileQueue(Path hfilesDir, Table table, Deque<LoadQueueItem> queue,
+ boolean validateHFile, boolean ignoreUnmatchedCF) throws IOException {
discoverLoadQueue(queue, hfilesDir, validateHFile);
- validateFamiliesInHFiles(table, queue);
+ validateFamiliesInHFiles(table, queue, ignoreUnmatchedCF);
}
// Initialize a thread pool
@@ -448,14 +486,13 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
/**
* Checks whether there is any invalid family name in HFiles to be bulk loaded.
*/
- private void validateFamiliesInHFiles(Table table, Deque<LoadQueueItem> queue)
+ private void validateFamiliesInHFiles(Table table, Deque<LoadQueueItem> queue, boolean silence)
throws IOException {
Collection<HColumnDescriptor> families = table.getTableDescriptor().getFamilies();
List<String> familyNames = new ArrayList<String>(families.size());
for (HColumnDescriptor family : families) {
familyNames.add(family.getNameAsString());
}
- List<String> unmatchedFamilies = new ArrayList<String>();
Iterator<LoadQueueItem> queueIter = queue.iterator();
while (queueIter.hasNext()) {
LoadQueueItem lqi = queueIter.next();
@@ -470,7 +507,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
+ unmatchedFamilies + "; valid family names of table " + table.getName() + " are: "
+ familyNames;
LOG.error(msg);
- throw new IOException(msg);
+ if (!silence) throw new IOException(msg);
}
}
@@ -774,7 +811,9 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
final List<Pair<byte[], String>> famPaths =
new ArrayList<Pair<byte[], String>>(lqis.size());
for (LoadQueueItem lqi : lqis) {
- famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString()));
+ if (!unmatchedFamilies.contains(Bytes.toString(lqi.family))) {
+ famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString()));
+ }
}
final RegionServerCallable<Boolean> svrCallable =
@@ -1028,7 +1067,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
- if (args.length != 2) {
+ if (args.length < 2) {
usage();
return -1;
}
@@ -1054,7 +1093,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
try (Table table = connection.getTable(tableName);
RegionLocator locator = connection.getRegionLocator(tableName)) {
- doBulkLoad(hfofDir, admin, table, locator);
+ boolean silence = "yes".equalsIgnoreCase(getConf().get(IGNORE_UNMATCHED_CF_CONF_KEY, ""));
+ doBulkLoad(hfofDir, admin, table, locator, silence);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/28737d05/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
index 0db96be..d617b52 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
@@ -120,7 +120,7 @@ public class TestBackupBase {
public static void waitForSystemTable() throws Exception
{
- try(Admin admin = TEST_UTIL.getAdmin();) {
+ try (Admin admin = TEST_UTIL.getAdmin();) {
while (!admin.tableExists(BackupSystemTable.getTableName())
|| !admin.isTableAvailable(BackupSystemTable.getTableName())) {
Thread.sleep(1000);
@@ -142,6 +142,18 @@ public class TestBackupBase {
TEST_UTIL.shutdownMiniMapReduceCluster();
}
+ HTable insertIntoTable(Connection conn, TableName table, byte[] family, int id, int numRows)
+ throws IOException {
+ HTable t = (HTable) conn.getTable(table);
+ Put p1;
+ for (int i = 0; i < numRows; i++) {
+ p1 = new Put(Bytes.toBytes("row-" + table + "-" + id + "-"+ i));
+ p1.addColumn(family, qualName, Bytes.toBytes("val" + i));
+ t.put(p1);
+ }
+ return t;
+ }
+
protected String backupTables(BackupType type, List<TableName> tables, String path)
throws IOException {
Connection conn = null;
http://git-wip-us.apache.org/repos/asf/hbase/blob/28737d05/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java
index 9a2ad89..ae37b4c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java
@@ -20,12 +20,15 @@ package org.apache.hadoop.hbase.backup;
import static org.junit.Assert.assertTrue;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BackupAdmin;
import org.apache.hadoop.hbase.client.Connection;
@@ -66,8 +69,15 @@ public class TestIncrementalBackup extends TestBackupBase {
LOG.info("create full backup image for all tables");
List<TableName> tables = Lists.newArrayList(table1, table2);
- HBaseAdmin admin = null;
+ final byte[] fam3Name = Bytes.toBytes("f3");
+ table1Desc.addFamily(new HColumnDescriptor(fam3Name));
+ HBaseTestingUtility.modifyTableSync(TEST_UTIL.getAdmin(), table1Desc);
+
Connection conn = ConnectionFactory.createConnection(conf1);
+ int NB_ROWS_FAM3 = 6;
+ insertIntoTable(conn, table1, fam3Name, 3, NB_ROWS_FAM3).close();
+
+ HBaseAdmin admin = null;
admin = (HBaseAdmin) conn.getAdmin();
BackupRequest request = new BackupRequest();
@@ -77,16 +87,11 @@ public class TestIncrementalBackup extends TestBackupBase {
assertTrue(checkSucceeded(backupIdFull));
// #2 - insert some data to table
+ HTable t1 = insertIntoTable(conn, table1, famName, 1, NB_ROWS_IN_BATCH);
LOG.debug("writing " + NB_ROWS_IN_BATCH + " rows to " + table1);
- HTable t1 = (HTable) conn.getTable(table1);
- Put p1;
- for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
- p1 = new Put(Bytes.toBytes("row-t1" + i));
- p1.addColumn(famName, qualName, Bytes.toBytes("val" + i));
- t1.put(p1);
- }
- Assert.assertThat(TEST_UTIL.countRows(t1), CoreMatchers.equalTo(NB_ROWS_IN_BATCH * 2));
+ Assert.assertThat(TEST_UTIL.countRows(t1), CoreMatchers.equalTo(
+ NB_ROWS_IN_BATCH * 2 + NB_ROWS_FAM3));
t1.close();
LOG.debug("written " + NB_ROWS_IN_BATCH + " rows to " + table1);
@@ -110,6 +115,24 @@ public class TestIncrementalBackup extends TestBackupBase {
String backupIdIncMultiple = admin.getBackupAdmin().backupTables(request);
assertTrue(checkSucceeded(backupIdIncMultiple));
+ // add column family f2 to table1
+ final byte[] fam2Name = Bytes.toBytes("f2");
+ table1Desc.addFamily(new HColumnDescriptor(fam2Name));
+ // drop column family f3
+ table1Desc.removeFamily(fam3Name);
+ HBaseTestingUtility.modifyTableSync(TEST_UTIL.getAdmin(), table1Desc);
+
+ int NB_ROWS_FAM2 = 7;
+ HTable t3 = insertIntoTable(conn, table1, fam2Name, 2, NB_ROWS_FAM2);
+ t3.close();
+
+ // #3 - incremental backup for multiple tables
+ request = new BackupRequest();
+ request.setBackupType(BackupType.INCREMENTAL).setTableList(tables)
+ .setTargetRootDir(BACKUP_ROOT_DIR);
+ String backupIdIncMultiple2 = admin.getBackupAdmin().backupTables(request);
+ assertTrue(checkSucceeded(backupIdIncMultiple2));
+
// #4 - restore full backup for all tables, without overwrite
TableName[] tablesRestoreFull =
new TableName[] { table1, table2 };
@@ -118,6 +141,7 @@ public class TestIncrementalBackup extends TestBackupBase {
new TableName[] { table1_restore, table2_restore };
BackupAdmin client = getBackupAdmin();
+ LOG.debug("Restoring full " + backupIdFull);
client.restore(createRestoreRequest(BACKUP_ROOT_DIR, backupIdFull, false,
tablesRestoreFull,
tablesMapFull, false));
@@ -131,7 +155,8 @@ public class TestIncrementalBackup extends TestBackupBase {
// #5.2 - checking row count of tables for full restore
HTable hTable = (HTable) conn.getTable(table1_restore);
- Assert.assertThat(TEST_UTIL.countRows(hTable), CoreMatchers.equalTo(NB_ROWS_IN_BATCH));
+ Assert.assertThat(TEST_UTIL.countRows(hTable), CoreMatchers.equalTo(NB_ROWS_IN_BATCH +
+ NB_ROWS_FAM3));
hTable.close();
hTable = (HTable) conn.getTable(table2_restore);
@@ -143,11 +168,16 @@ public class TestIncrementalBackup extends TestBackupBase {
new TableName[] { table1, table2 };
TableName[] tablesMapIncMultiple =
new TableName[] { table1_restore, table2_restore };
- client.restore(createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple, false,
+ client.restore(createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple2, false,
tablesRestoreIncMultiple, tablesMapIncMultiple, true));
hTable = (HTable) conn.getTable(table1_restore);
- Assert.assertThat(TEST_UTIL.countRows(hTable), CoreMatchers.equalTo(NB_ROWS_IN_BATCH * 2));
+ LOG.debug("After incremental restore: " + hTable.getTableDescriptor());
+ LOG.debug("f1 has " + TEST_UTIL.countRows(hTable, famName) + " rows");
+ Assert.assertThat(TEST_UTIL.countRows(hTable, famName),
+ CoreMatchers.equalTo(NB_ROWS_IN_BATCH * 2));
+ LOG.debug("f2 has " + TEST_UTIL.countRows(hTable, fam2Name) + " rows");
+ Assert.assertThat(TEST_UTIL.countRows(hTable, fam2Name), CoreMatchers.equalTo(NB_ROWS_FAM2));
hTable.close();
hTable = (HTable) conn.getTable(table2_restore);