You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by el...@apache.org on 2018/01/12 19:07:19 UTC
hbase git commit: HBASE-19568: Restore of HBase table using
incremental backup doesn't restore rows from an earlier incremental backup
Repository: hbase
Updated Branches:
refs/heads/master 057e80c16 -> a5601c8ea
HBASE-19568: Restore of HBase table using incremental backup doesn't restore rows from an earlier incremental backup
Signed-off-by: Josh Elser <el...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a5601c8e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a5601c8e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a5601c8e
Branch: refs/heads/master
Commit: a5601c8eac6bfcac7d869574547f505d44e49065
Parents: 057e80c
Author: Vladimir Rodionov <vr...@hortonworks.com>
Authored: Wed Jan 10 16:26:09 2018 -0800
Committer: Josh Elser <el...@apache.org>
Committed: Fri Jan 12 13:13:17 2018 -0500
----------------------------------------------------------------------
.../hbase/backup/impl/BackupAdminImpl.java | 2 +-
.../hadoop/hbase/backup/impl/BackupManager.java | 19 +--
.../hbase/backup/impl/BackupSystemTable.java | 135 +++++++++++++------
.../impl/IncrementalTableBackupClient.java | 59 ++++++--
.../hbase/backup/impl/RestoreTablesClient.java | 55 ++++----
.../hbase/backup/impl/TableBackupClient.java | 32 ++---
.../hadoop/hbase/backup/TestBackupBase.java | 4 -
.../TestIncrementalBackupWithBulkLoad.java | 24 +++-
8 files changed, 213 insertions(+), 117 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/a5601c8e/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
index 8ba57d2..f27490c 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
@@ -271,7 +271,7 @@ public class BackupAdminImpl implements BackupAdmin {
LOG.debug(numDeleted + " bulk loaded files out of " + map.size() + " were deleted");
}
if (success) {
- sysTable.deleteBulkLoadedFiles(map);
+ sysTable.deleteBulkLoadedRows(new ArrayList<byte[]>(map.keySet()));
}
sysTable.deleteBackupInfo(backupInfo.getBackupId());
http://git-wip-us.apache.org/repos/asf/hbase/blob/a5601c8e/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
index 7199fd5..4ca998c 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.BackupInfo;
import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
+import org.apache.hadoop.hbase.backup.BackupObserver;
import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
import org.apache.hadoop.hbase.backup.BackupType;
import org.apache.hadoop.hbase.backup.HBackupFileSystem;
@@ -43,6 +44,7 @@ import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
import org.apache.hadoop.hbase.backup.regionserver.LogRollRegionServerProcedureManager;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.procedure.ProcedureManagerHost;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
@@ -140,10 +142,14 @@ public class BackupManager implements Closeable {
conf.set(ProcedureManagerHost.REGIONSERVER_PROCEDURE_CONF_KEY, classes + ","
+ regionProcedureClass);
}
+ String coproc = conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY);
+ String regionObserverClass = BackupObserver.class.getName();
+ conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, (coproc == null ? "" : coproc + ",") +
+ regionObserverClass);
if (LOG.isDebugEnabled()) {
- LOG.debug("Added region procedure manager: " + regionProcedureClass);
+ LOG.debug("Added region procedure manager: " + regionProcedureClass +
+ ". Added region observer: " + regionObserverClass);
}
-
}
public static boolean isBackupEnabled(Configuration conf) {
@@ -415,13 +421,8 @@ public class BackupManager implements Closeable {
return systemTable.readBulkloadRows(tableList);
}
- public void removeBulkLoadedRows(List<TableName> lst, List<byte[]> rows) throws IOException {
- systemTable.removeBulkLoadedRows(lst, rows);
- }
-
- public void writeBulkLoadedFiles(List<TableName> sTableList, Map<byte[], List<Path>>[] maps)
- throws IOException {
- systemTable.writeBulkLoadedFiles(sTableList, maps, backupInfo.getBackupId());
+ public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException {
+ systemTable.deleteBulkLoadedRows(rows);
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/a5601c8e/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
index 6b721d4..cf34d14 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
@@ -42,8 +42,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
@@ -53,6 +51,8 @@ import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
import org.apache.hadoop.hbase.backup.BackupType;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
@@ -62,6 +62,8 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.SnapshotDescription;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
@@ -122,7 +124,21 @@ public final class BackupSystemTable implements Closeable {
}
+ /**
+ * Backup system table (main) name
+ */
private TableName tableName;
+
+ /**
+ * Backup System table name for bulk loaded files.
+ * We keep all bulk loaded file references in a separate table
+ * because we have to isolate general backup operations: create, merge etc
+ * from activity of RegionObserver, which controls process of a bulk loading
+ * {@link org.apache.hadoop.hbase.backup.BackupObserver}
+ */
+
+ private TableName bulkLoadTableName;
+
/**
* Stores backup sessions (contexts)
*/
@@ -174,20 +190,29 @@ public final class BackupSystemTable implements Closeable {
public BackupSystemTable(Connection conn) throws IOException {
this.connection = conn;
- tableName = BackupSystemTable.getTableName(conn.getConfiguration());
+ Configuration conf = this.connection.getConfiguration();
+ tableName = BackupSystemTable.getTableName(conf);
+ bulkLoadTableName = BackupSystemTable.getTableNameForBulkLoadedData(conf);
checkSystemTable();
}
private void checkSystemTable() throws IOException {
try (Admin admin = connection.getAdmin()) {
verifyNamespaceExists(admin);
-
+ Configuration conf = connection.getConfiguration();
if (!admin.tableExists(tableName)) {
- HTableDescriptor backupHTD =
- BackupSystemTable.getSystemTableDescriptor(connection.getConfiguration());
+ TableDescriptor backupHTD =
+ BackupSystemTable.getSystemTableDescriptor(conf);
admin.createTable(backupHTD);
}
- waitForSystemTable(admin);
+ if (!admin.tableExists(bulkLoadTableName)) {
+ TableDescriptor blHTD =
+ BackupSystemTable.getSystemTableForBulkLoadedDataDescriptor(conf);
+ admin.createTable(blHTD);
+ }
+ waitForSystemTable(admin, tableName);
+ waitForSystemTable(admin, bulkLoadTableName);
+
}
}
@@ -207,7 +232,7 @@ public final class BackupSystemTable implements Closeable {
}
}
- private void waitForSystemTable(Admin admin) throws IOException {
+ private void waitForSystemTable(Admin admin, TableName tableName) throws IOException {
long TIMEOUT = 60000;
long startTime = EnvironmentEdgeManager.currentTime();
while (!admin.tableExists(tableName) || !admin.isTableAvailable(tableName)) {
@@ -216,10 +241,11 @@ public final class BackupSystemTable implements Closeable {
} catch (InterruptedException e) {
}
if (EnvironmentEdgeManager.currentTime() - startTime > TIMEOUT) {
- throw new IOException("Failed to create backup system table after " + TIMEOUT + "ms");
+ throw new IOException("Failed to create backup system table "+
+ tableName +" after " + TIMEOUT + "ms");
}
}
- LOG.debug("Backup table exists and available");
+ LOG.debug("Backup table "+tableName+" exists and available");
}
@@ -251,7 +277,7 @@ public final class BackupSystemTable implements Closeable {
*/
Map<byte[], String> readBulkLoadedFiles(String backupId) throws IOException {
Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId);
- try (Table table = connection.getTable(tableName);
+ try (Table table = connection.getTable(bulkLoadTableName);
ResultScanner scanner = table.getScanner(scan)) {
Result res = null;
Map<byte[], String> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
@@ -279,7 +305,7 @@ public final class BackupSystemTable implements Closeable {
throws IOException {
Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId);
Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList == null ? 1 : sTableList.size()];
- try (Table table = connection.getTable(tableName);
+ try (Table table = connection.getTable(bulkLoadTableName);
ResultScanner scanner = table.getScanner(scan)) {
Result res = null;
while ((res = scanner.next()) != null) {
@@ -324,18 +350,6 @@ public final class BackupSystemTable implements Closeable {
}
}
- /*
- * @param map Map of row keys to path of bulk loaded hfile
- */
- void deleteBulkLoadedFiles(Map<byte[], String> map) throws IOException {
- try (Table table = connection.getTable(tableName)) {
- List<Delete> dels = new ArrayList<>();
- for (byte[] row : map.keySet()) {
- dels.add(new Delete(row).addFamily(BackupSystemTable.META_FAMILY));
- }
- table.delete(dels);
- }
- }
/**
* Deletes backup status from backup system table table
@@ -366,7 +380,7 @@ public final class BackupSystemTable implements Closeable {
LOG.debug("write bulk load descriptor to backup " + tabName + " with " + finalPaths.size()
+ " entries");
}
- try (Table table = connection.getTable(tableName)) {
+ try (Table table = connection.getTable(bulkLoadTableName)) {
List<Put> puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region, finalPaths);
table.put(puts);
LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName);
@@ -386,7 +400,7 @@ public final class BackupSystemTable implements Closeable {
LOG.debug("write bulk load descriptor to backup " + tabName + " with " + pairs.size()
+ " entries");
}
- try (Table table = connection.getTable(tableName)) {
+ try (Table table = connection.getTable(bulkLoadTableName)) {
List<Put> puts =
BackupSystemTable.createPutForPreparedBulkload(tabName, region, family, pairs);
table.put(puts);
@@ -399,8 +413,8 @@ public final class BackupSystemTable implements Closeable {
* @param lst list of table names
* @param rows the rows to be deleted
*/
- public void removeBulkLoadedRows(List<TableName> lst, List<byte[]> rows) throws IOException {
- try (Table table = connection.getTable(tableName)) {
+ public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException {
+ try (Table table = connection.getTable(bulkLoadTableName)) {
List<Delete> lstDels = new ArrayList<>();
for (byte[] row : rows) {
Delete del = new Delete(row);
@@ -408,7 +422,7 @@ public final class BackupSystemTable implements Closeable {
LOG.debug("orig deleting the row: " + Bytes.toString(row));
}
table.delete(lstDels);
- LOG.debug("deleted " + rows.size() + " original bulkload rows for " + lst.size() + " tables");
+ LOG.debug("deleted " + rows.size() + " original bulkload rows");
}
}
@@ -425,7 +439,7 @@ public final class BackupSystemTable implements Closeable {
for (TableName tTable : tableList) {
Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(tTable);
Map<String, Map<String, List<Pair<String, Boolean>>>> tblMap = map.get(tTable);
- try (Table table = connection.getTable(tableName);
+ try (Table table = connection.getTable(bulkLoadTableName);
ResultScanner scanner = table.getScanner(scan)) {
Result res = null;
while ((res = scanner.next()) != null) {
@@ -480,7 +494,7 @@ public final class BackupSystemTable implements Closeable {
*/
public void writeBulkLoadedFiles(List<TableName> sTableList, Map<byte[], List<Path>>[] maps,
String backupId) throws IOException {
- try (Table table = connection.getTable(tableName)) {
+ try (Table table = connection.getTable(bulkLoadTableName)) {
long ts = EnvironmentEdgeManager.currentTime();
int cnt = 0;
List<Put> puts = new ArrayList<>();
@@ -1311,21 +1325,28 @@ public final class BackupSystemTable implements Closeable {
* Get backup system table descriptor
* @return table's descriptor
*/
- public static HTableDescriptor getSystemTableDescriptor(Configuration conf) {
+ public static TableDescriptor getSystemTableDescriptor(Configuration conf) {
+
+ TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(getTableName(conf));
+
+ ColumnFamilyDescriptorBuilder colBuilder =
+ ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY);
- HTableDescriptor tableDesc = new HTableDescriptor(getTableName(conf));
- HColumnDescriptor colSessionsDesc = new HColumnDescriptor(SESSIONS_FAMILY);
- colSessionsDesc.setMaxVersions(1);
- // Time to keep backup sessions (secs)
+ colBuilder.setMaxVersions(1);
Configuration config = HBaseConfiguration.create();
int ttl =
config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY,
BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT);
- colSessionsDesc.setTimeToLive(ttl);
- tableDesc.addFamily(colSessionsDesc);
- HColumnDescriptor colMetaDesc = new HColumnDescriptor(META_FAMILY);
- tableDesc.addFamily(colMetaDesc);
- return tableDesc;
+ colBuilder.setTimeToLive(ttl);
+
+ ColumnFamilyDescriptor colSessionsDesc = colBuilder.build();
+ builder.addColumnFamily(colSessionsDesc);
+
+ colBuilder =
+ ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY);
+ colBuilder.setTimeToLive(ttl);
+ builder.addColumnFamily(colBuilder.build());
+ return builder.build();
}
public static TableName getTableName(Configuration conf) {
@@ -1344,6 +1365,38 @@ public final class BackupSystemTable implements Closeable {
}
/**
+ * Get backup system table descriptor
+ * @return table's descriptor
+ */
+ public static TableDescriptor getSystemTableForBulkLoadedDataDescriptor(Configuration conf) {
+
+ TableDescriptorBuilder builder =
+ TableDescriptorBuilder.newBuilder(getTableNameForBulkLoadedData(conf));
+
+ ColumnFamilyDescriptorBuilder colBuilder =
+ ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY);
+ colBuilder.setMaxVersions(1);
+ Configuration config = HBaseConfiguration.create();
+ int ttl =
+ config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY,
+ BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT);
+ colBuilder.setTimeToLive(ttl);
+ ColumnFamilyDescriptor colSessionsDesc = colBuilder.build();
+ builder.addColumnFamily(colSessionsDesc);
+ colBuilder =
+ ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY);
+ colBuilder.setTimeToLive(ttl);
+ builder.addColumnFamily(colBuilder.build());
+ return builder.build();
+ }
+
+ public static TableName getTableNameForBulkLoadedData(Configuration conf) {
+ String name =
+ conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY,
+ BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT) + "_bulk";
+ return TableName.valueOf(name);
+ }
+ /**
* Creates Put operation for a given backup info object
* @param context backup info
* @return put operation
http://git-wip-us.apache.org/repos/asf/hbase/blob/a5601c8e/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
index a966ad3..34d713d 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
@@ -72,7 +72,6 @@ public class IncrementalTableBackupClient extends TableBackupClient {
}
protected List<String> filterMissingFiles(List<String> incrBackupFileList) throws IOException {
- FileSystem fs = FileSystem.get(conf);
List<String> list = new ArrayList<String>();
for (String file : incrBackupFileList) {
Path p = new Path(file);
@@ -110,6 +109,7 @@ public class IncrementalTableBackupClient extends TableBackupClient {
* @param sTableList list of tables to be backed up
* @return map of table to List of files
*/
+ @SuppressWarnings("unchecked")
protected Map<byte[], List<Path>>[] handleBulkLoad(List<TableName> sTableList) throws IOException {
Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList.size()];
List<String> activeFiles = new ArrayList<String>();
@@ -117,7 +117,6 @@ public class IncrementalTableBackupClient extends TableBackupClient {
Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>> pair =
backupManager.readBulkloadRows(sTableList);
Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = pair.getFirst();
- FileSystem fs = FileSystem.get(conf);
FileSystem tgtFs;
try {
tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf);
@@ -126,6 +125,7 @@ public class IncrementalTableBackupClient extends TableBackupClient {
}
Path rootdir = FSUtils.getRootDir(conf);
Path tgtRoot = new Path(new Path(backupInfo.getBackupRootDir()), backupId);
+
for (Map.Entry<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> tblEntry :
map.entrySet()) {
TableName srcTable = tblEntry.getKey();
@@ -192,26 +192,47 @@ public class IncrementalTableBackupClient extends TableBackupClient {
}
copyBulkLoadedFiles(activeFiles, archiveFiles);
-
- backupManager.writeBulkLoadedFiles(sTableList, mapForSrc);
- backupManager.removeBulkLoadedRows(sTableList, pair.getSecond());
+ backupManager.deleteBulkLoadedRows(pair.getSecond());
return mapForSrc;
}
private void copyBulkLoadedFiles(List<String> activeFiles, List<String> archiveFiles)
- throws IOException
- {
+ throws IOException {
try {
// Enable special mode of BackupDistCp
conf.setInt(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 5);
// Copy active files
String tgtDest = backupInfo.getBackupRootDir() + Path.SEPARATOR + backupInfo.getBackupId();
- if (activeFiles.size() > 0) {
+ int attempt = 1;
+ while (activeFiles.size() > 0) {
+ LOG.info("Copy "+ activeFiles.size() +
+ " active bulk loaded files. Attempt ="+ (attempt++));
String[] toCopy = new String[activeFiles.size()];
activeFiles.toArray(toCopy);
- incrementalCopyHFiles(toCopy, tgtDest);
+ // Active file can be archived during copy operation,
+ // we need to handle this properly
+ try {
+ incrementalCopyHFiles(toCopy, tgtDest);
+ break;
+ } catch (IOException e) {
+ // Check if some files got archived
+ // Update active and archived lists
+ // When file is being moved from active to archive
+ // directory, the number of active files decreases
+
+ int numOfActive = activeFiles.size();
+ updateFileLists(activeFiles, archiveFiles);
+ if (activeFiles.size() < numOfActive) {
+ continue;
+ }
+ // if not - throw exception
+ throw e;
+ }
}
+ // If incremental copy will fail for archived files
+ // we will have partially loaded files in backup destination (only files from active data
+ // directory). It is OK, because the backup will marked as FAILED and data will be cleaned up
if (archiveFiles.size() > 0) {
String[] toCopy = new String[archiveFiles.size()];
archiveFiles.toArray(toCopy);
@@ -224,6 +245,25 @@ public class IncrementalTableBackupClient extends TableBackupClient {
}
+ private void updateFileLists(List<String> activeFiles, List<String> archiveFiles)
+ throws IOException {
+ List<String> newlyArchived = new ArrayList<String>();
+
+ for (String spath : activeFiles) {
+ if (!fs.exists(new Path(spath))) {
+ newlyArchived.add(spath);
+ }
+ }
+
+ if (newlyArchived.size() > 0) {
+ activeFiles.removeAll(newlyArchived);
+ archiveFiles.addAll(newlyArchived);
+ }
+
+ LOG.debug(newlyArchived.size() + " files have been archived.");
+
+ }
+
@Override
public void execute() throws IOException {
@@ -322,7 +362,6 @@ public class IncrementalTableBackupClient extends TableBackupClient {
protected void deleteBulkLoadDirectory() throws IOException {
// delete original bulk load directory on method exit
Path path = getBulkOutputDir();
- FileSystem fs = FileSystem.get(conf);
boolean result = fs.delete(path, true);
if (!result) {
LOG.warn("Could not delete " + path);
http://git-wip-us.apache.org/repos/asf/hbase/blob/a5601c8e/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
index d3d3e06..c6b6bad 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
@@ -21,33 +21,31 @@ package org.apache.hadoop.hbase.backup.impl;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.BackupType;
import org.apache.hadoop.hbase.backup.HBackupFileSystem;
import org.apache.hadoop.hbase.backup.RestoreRequest;
import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
-import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.backup.util.RestoreTool;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
-import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.LoadQueueItem;
/**
* Restore table implementation
@@ -171,8 +169,10 @@ public class RestoreTablesClient {
for (int i = 1; i < images.length; i++) {
BackupImage im = images[i];
String fileBackupDir =
- HBackupFileSystem.getTableBackupDataDir(im.getRootDir(), im.getBackupId(), sTable);
- dirList.add(new Path(fileBackupDir));
+ HBackupFileSystem.getTableBackupDir(im.getRootDir(), im.getBackupId(), sTable);
+ List<Path> list = getFilesRecursively(fileBackupDir);
+ dirList.addAll(list);
+
}
String dirs = StringUtils.join(dirList, ",");
@@ -185,6 +185,20 @@ public class RestoreTablesClient {
LOG.info(sTable + " has been successfully restored to " + tTable);
}
+ private List<Path> getFilesRecursively(String fileBackupDir)
+ throws IllegalArgumentException, IOException {
+ FileSystem fs = FileSystem.get((new Path(fileBackupDir)).toUri(), new Configuration());
+ List<Path> list = new ArrayList<Path>();
+ RemoteIterator<LocatedFileStatus> it = fs.listFiles(new Path(fileBackupDir), true);
+ while (it.hasNext()) {
+ Path p = it.next().getPath();
+ if (HFile.isHFileFormat(fs, p)) {
+ list.add(p);
+ }
+ }
+ return list;
+ }
+
/**
* Restore operation. Stage 2: resolved Backup Image dependency
* @param backupManifestMap : tableName, Manifest
@@ -226,27 +240,6 @@ public class RestoreTablesClient {
}
}
}
- try (BackupSystemTable table = new BackupSystemTable(conn)) {
- List<TableName> sTableList = Arrays.asList(sTableArray);
- for (String id : backupIdSet) {
- LOG.debug("restoring bulk load for " + id);
- Map<byte[], List<Path>>[] mapForSrc = table.readBulkLoadedFiles(id, sTableList);
- Map<LoadQueueItem, ByteBuffer> loaderResult;
- conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true);
- LoadIncrementalHFiles loader = BackupUtils.createLoader(conf);
- for (int i = 0; i < sTableList.size(); i++) {
- if (mapForSrc[i] != null && !mapForSrc[i].isEmpty()) {
- loaderResult = loader.run(mapForSrc[i], tTableArray[i]);
- LOG.debug("bulk loading " + sTableList.get(i) + " to " + tTableArray[i]);
- if (loaderResult.isEmpty()) {
- String msg = "Couldn't bulk load for " + sTableList.get(i) + " to " + tTableArray[i];
- LOG.error(msg);
- throw new IOException(msg);
- }
- }
- }
- }
- }
LOG.debug("restoreStage finished");
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a5601c8e/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java
index 09ab28c..ab24cca 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java
@@ -69,6 +69,7 @@ public abstract class TableBackupClient {
protected BackupManager backupManager;
protected BackupInfo backupInfo;
+ protected FileSystem fs;
public TableBackupClient() {
}
@@ -90,6 +91,7 @@ public abstract class TableBackupClient {
this.tableList = request.getTableList();
this.conn = conn;
this.conf = conn.getConfiguration();
+ this.fs = FSUtils.getCurrentFileSystem(conf);
backupInfo =
backupManager.createBackupInfo(backupId, request.getBackupType(), tableList,
request.getTargetRootDir(), request.getTotalTasks(), request.getBandwidth());
@@ -258,22 +260,21 @@ public abstract class TableBackupClient {
}
}
- public static void cleanupAndRestoreBackupSystem (Connection conn, BackupInfo backupInfo,
- Configuration conf) throws IOException
- {
+ public static void cleanupAndRestoreBackupSystem(Connection conn, BackupInfo backupInfo,
+ Configuration conf) throws IOException {
BackupType type = backupInfo.getType();
- // if full backup, then delete HBase snapshots if there already are snapshots taken
- // and also clean up export snapshot log files if exist
- if (type == BackupType.FULL) {
- deleteSnapshots(conn, backupInfo, conf);
- cleanupExportSnapshotLog(conf);
- }
- BackupSystemTable.restoreFromSnapshot(conn);
- BackupSystemTable.deleteSnapshot(conn);
- // clean up the uncompleted data at target directory if the ongoing backup has already entered
- // the copy phase
- // For incremental backup, DistCp logs will be cleaned with the targetDir.
- cleanupTargetDir(backupInfo, conf);
+ // if full backup, then delete HBase snapshots if there already are snapshots taken
+ // and also clean up export snapshot log files if exist
+ if (type == BackupType.FULL) {
+ deleteSnapshots(conn, backupInfo, conf);
+ cleanupExportSnapshotLog(conf);
+ }
+ BackupSystemTable.restoreFromSnapshot(conn);
+ BackupSystemTable.deleteSnapshot(conn);
+ // clean up the uncompleted data at target directory if the ongoing backup has already entered
+ // the copy phase
+ // For incremental backup, DistCp logs will be cleaned with the targetDir.
+ cleanupTargetDir(backupInfo, conf);
}
@@ -355,7 +356,6 @@ public abstract class TableBackupClient {
*/
protected void cleanupDistCpLog(BackupInfo backupInfo, Configuration conf) throws IOException {
Path rootPath = new Path(backupInfo.getHLogTargetDir()).getParent();
- FileSystem fs = FileSystem.get(rootPath.toUri(), conf);
FileStatus[] files = FSUtils.listStatus(fs, rootPath);
if (files == null) {
return;
http://git-wip-us.apache.org/repos/asf/hbase/blob/a5601c8e/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
index 8ce1c0e..2be7784 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
@@ -57,7 +57,6 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.security.HadoopSecurityEnabledUserProviderForTesting;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.access.SecureTestUtil;
@@ -297,9 +296,6 @@ public class TestBackupBase {
// setup configuration
SecureTestUtil.enableSecurity(TEST_UTIL.getConfiguration());
}
- String coproc = conf1.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY);
- conf1.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, (coproc == null ? "" : coproc + ",") +
- BackupObserver.class.getName());
conf1.setBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, true);
BackupManager.decorateMasterConfiguration(conf1);
BackupManager.decorateRegionServerConfiguration(conf1);
http://git-wip-us.apache.org/repos/asf/hbase/blob/a5601c8e/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java
index db33abe..ed1d010 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java
@@ -113,18 +113,32 @@ public class TestIncrementalBackupWithBulkLoad extends TestBackupBase {
request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR);
String backupIdIncMultiple = client.backupTables(request);
assertTrue(checkSucceeded(backupIdIncMultiple));
+ // #4 bulk load again
+ LOG.debug("bulk loading into " + testName);
+ int actual1 = TestLoadIncrementalHFiles.loadHFiles(testName, table1Desc, TEST_UTIL, famName,
+ qualName, false, null,
+ new byte[][][] { new byte[][] { Bytes.toBytes("ppp"), Bytes.toBytes("qqq") },
+ new byte[][] { Bytes.toBytes("rrr"), Bytes.toBytes("sss") }, },
+ true, false, true, NB_ROWS_IN_BATCH * 2 + actual, NB_ROWS2);
+ // #5 - incremental backup for table1
+ tables = Lists.newArrayList(table1);
+ request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR);
+ String backupIdIncMultiple1 = client.backupTables(request);
+ assertTrue(checkSucceeded(backupIdIncMultiple1));
+ // Delete all data in table1
+ TEST_UTIL.deleteTableData(table1);
// #5.1 - check tables for full restore */
HBaseAdmin hAdmin = TEST_UTIL.getHBaseAdmin();
// #6 - restore incremental backup for table1
TableName[] tablesRestoreIncMultiple = new TableName[] { table1 };
- TableName[] tablesMapIncMultiple = new TableName[] { table1_restore };
- client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple,
- false, tablesRestoreIncMultiple, tablesMapIncMultiple, true));
+ //TableName[] tablesMapIncMultiple = new TableName[] { table1_restore };
+ client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple1,
+ false, tablesRestoreIncMultiple, tablesRestoreIncMultiple, true));
- HTable hTable = (HTable) conn.getTable(table1_restore);
- Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH * 2+actual);
+ HTable hTable = (HTable) conn.getTable(table1);
+ Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH * 2 + actual + actual1);
request = createBackupRequest(BackupType.FULL, tables, BACKUP_ROOT_DIR);
backupIdFull = client.backupTables(request);