You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2018/03/10 17:38:29 UTC
hbase git commit: HBASE-19969: Improve fault tolerance in backup
Merge operation
Repository: hbase
Updated Branches:
refs/heads/master 19a396b9c -> d5aaeee88
HBASE-19969: Improve fault tolerance in backup Merge operation
Signed-off-by: tedyu <yu...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d5aaeee8
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d5aaeee8
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d5aaeee8
Branch: refs/heads/master
Commit: d5aaeee88b331e064830a2774f4fed238631457c
Parents: 19a396b
Author: Vladimir Rodionov <vr...@hortonworks.com>
Authored: Fri Mar 9 16:49:55 2018 -0800
Committer: tedyu <yu...@gmail.com>
Committed: Sat Mar 10 09:37:55 2018 -0800
----------------------------------------------------------------------
.../hadoop/hbase/backup/HBackupFileSystem.java | 19 +++
.../hbase/backup/impl/BackupAdminImpl.java | 1 +
.../hbase/backup/impl/BackupCommands.java | 66 ++++++--
.../mapreduce/MapReduceBackupMergeJob.java | 161 ++++++++++++-------
.../hadoop/hbase/backup/util/BackupUtils.java | 12 ++
.../hadoop/hbase/backup/TestBackupMerge.java | 132 +++++++++++++++
.../TestIncrementalBackupMergeWithFailures.java | 40 ++++-
7 files changed, 351 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/d5aaeee8/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java
index ff65a64..e097554 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java
@@ -63,6 +63,25 @@ public final class HBackupFileSystem {
+ Path.SEPARATOR;
}
+ /**
+ * Get backup temporary directory
+ * @param backupRootDir backup root
+ * @return backup tmp directory path
+ */
+ public static Path getBackupTmpDirPath(String backupRootDir) {
+ return new Path(backupRootDir, ".tmp");
+ }
+
+ /**
+ * Get backup tmp directory for backupId
+ * @param backupRoot backup root
+ * @param backupId backup id
+ * @return backup tmp directory path
+ */
+ public static Path getBackupTmpDirPathForBackupId(String backupRoot, String backupId) {
+ return new Path(getBackupTmpDirPath(backupRoot), backupId);
+ }
+
public static String getTableBackupDataDir(String backupRootDir, String backupId,
TableName tableName) {
return getTableBackupDir(backupRootDir, backupId, tableName) + Path.SEPARATOR + "data";
http://git-wip-us.apache.org/repos/asf/hbase/blob/d5aaeee8/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
index 776cc1b..0d20f37 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
@@ -619,6 +619,7 @@ public class BackupAdminImpl implements BackupAdmin {
public void mergeBackups(String[] backupIds) throws IOException {
try (final BackupSystemTable sysTable = new BackupSystemTable(conn)) {
checkIfValidForMerge(backupIds, sysTable);
+ //TODO run job on remote cluster
BackupMergeJob job = BackupRestoreFactory.getBackupMergeJob(conn.getConfiguration());
job.run(backupIds);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d5aaeee8/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
index c310178..3c43bb6 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
@@ -58,13 +58,13 @@ import org.apache.hadoop.hbase.backup.BackupRequest;
import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
import org.apache.hadoop.hbase.backup.BackupRestoreConstants.BackupCommand;
import org.apache.hadoop.hbase.backup.BackupType;
+import org.apache.hadoop.hbase.backup.HBackupFileSystem;
import org.apache.hadoop.hbase.backup.util.BackupSet;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
-
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
/**
@@ -403,7 +403,7 @@ public final class BackupCommands {
}
}
- private static class HelpCommand extends Command {
+ public static class HelpCommand extends Command {
HelpCommand(Configuration conf, CommandLine cmdline) {
super(conf);
this.cmdline = cmdline;
@@ -454,7 +454,7 @@ public final class BackupCommands {
}
}
- private static class DescribeCommand extends Command {
+ public static class DescribeCommand extends Command {
DescribeCommand(Configuration conf, CommandLine cmdline) {
super(conf);
this.cmdline = cmdline;
@@ -492,7 +492,7 @@ public final class BackupCommands {
}
}
- private static class ProgressCommand extends Command {
+ public static class ProgressCommand extends Command {
ProgressCommand(Configuration conf, CommandLine cmdline) {
super(conf);
this.cmdline = cmdline;
@@ -547,7 +547,7 @@ public final class BackupCommands {
}
}
- private static class DeleteCommand extends Command {
+ public static class DeleteCommand extends Command {
DeleteCommand(Configuration conf, CommandLine cmdline) {
super(conf);
this.cmdline = cmdline;
@@ -586,7 +586,7 @@ public final class BackupCommands {
}
}
- private static class RepairCommand extends Command {
+ public static class RepairCommand extends Command {
RepairCommand(Configuration conf, CommandLine cmdline) {
super(conf);
this.cmdline = cmdline;
@@ -661,8 +661,9 @@ public final class BackupCommands {
System.out.println("DELETE operation finished OK: " + StringUtils.join(backupIds));
}
- private void repairFailedBackupMergeIfAny(Connection conn, BackupSystemTable sysTable)
+ public static void repairFailedBackupMergeIfAny(Connection conn, BackupSystemTable sysTable)
throws IOException {
+
String[] backupIds = sysTable.getListOfBackupIdsFromMergeOperation();
if (backupIds == null || backupIds.length == 0) {
System.out.println("No failed backup MERGE operation found");
@@ -671,17 +672,52 @@ public final class BackupCommands {
return;
}
System.out.println("Found failed MERGE operation for: " + StringUtils.join(backupIds));
- System.out.println("Running MERGE again ...");
+ // Check if backup .tmp exists
+ BackupInfo bInfo = sysTable.readBackupInfo(backupIds[0]);
+ String backupRoot = bInfo.getBackupRootDir();
+ FileSystem fs = FileSystem.get(new Path(backupRoot).toUri(), new Configuration());
+ String backupId = BackupUtils.findMostRecentBackupId(backupIds);
+ Path tmpPath = HBackupFileSystem.getBackupTmpDirPathForBackupId(backupRoot, backupId);
+ if (fs.exists(tmpPath)) {
+ // Move data back
+ Path destPath = HBackupFileSystem.getBackupPath(backupRoot, backupId);
+ if (!fs.delete(destPath, true)) {
+ System.out.println("Failed to delete " + destPath);
+ }
+ boolean res = fs.rename(tmpPath, destPath);
+ if (!res) {
+ throw new IOException("MERGE repair: failed to rename from "+ tmpPath+" to "+ destPath);
+ }
+ System.out.println("MERGE repair: renamed from "+ tmpPath+" to "+ destPath+" res="+ res);
+ } else {
+ checkRemoveBackupImages(fs, backupRoot, backupIds);
+ }
// Restore table from snapshot
BackupSystemTable.restoreFromSnapshot(conn);
- // Unlock backupo system
+ // Unlock backup system
sysTable.finishBackupExclusiveOperation();
// Finish previous failed session
sysTable.finishMergeOperation();
- try (BackupAdmin admin = new BackupAdminImpl(conn)) {
- admin.mergeBackups(backupIds);
+
+ System.out.println("MERGE repair operation finished OK: " + StringUtils.join(backupIds));
+ }
+
+ private static void checkRemoveBackupImages(FileSystem fs, String backupRoot,
+ String[] backupIds) throws IOException {
+ String mergedBackupId = BackupUtils.findMostRecentBackupId(backupIds);
+ for (String backupId: backupIds) {
+ if (backupId.equals(mergedBackupId)) {
+ continue;
+ }
+ Path path = HBackupFileSystem.getBackupPath(backupRoot, backupId);
+ if (fs.exists(path)) {
+ if (!fs.delete(path, true)) {
+ System.out.println("MERGE repair removing: "+ path +" - FAILED");
+ } else {
+ System.out.println("MERGE repair removing: "+ path +" - OK");
+ }
+ }
}
- System.out.println("MERGE operation finished OK: " + StringUtils.join(backupIds));
}
@Override
@@ -690,7 +726,7 @@ public final class BackupCommands {
}
}
- private static class MergeCommand extends Command {
+ public static class MergeCommand extends Command {
MergeCommand(Configuration conf, CommandLine cmdline) {
super(conf);
this.cmdline = cmdline;
@@ -739,7 +775,7 @@ public final class BackupCommands {
}
}
- private static class HistoryCommand extends Command {
+ public static class HistoryCommand extends Command {
private final static int DEFAULT_HISTORY_LENGTH = 10;
HistoryCommand(Configuration conf, CommandLine cmdline) {
@@ -862,7 +898,7 @@ public final class BackupCommands {
}
}
- private static class BackupSetCommand extends Command {
+ public static class BackupSetCommand extends Command {
private final static String SET_ADD_CMD = "add";
private final static String SET_REMOVE_CMD = "remove";
private final static String SET_DELETE_CMD = "delete";
http://git-wip-us.apache.org/repos/asf/hbase/blob/d5aaeee8/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java
index 3fcf692..f71b62a 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java
@@ -19,12 +19,13 @@ package org.apache.hadoop.hbase.backup.mapreduce;
import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded;
-import java.io.FileNotFoundException;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.Stack;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
@@ -36,7 +37,6 @@ import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.BackupInfo;
import org.apache.hadoop.hbase.backup.BackupMergeJob;
-import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
import org.apache.hadoop.hbase.backup.HBackupFileSystem;
import org.apache.hadoop.hbase.backup.impl.BackupManifest;
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
@@ -105,7 +105,7 @@ public class MapReduceBackupMergeJob implements BackupMergeJob {
table.startMergeOperation(backupIds);
// Select most recent backup id
- String mergedBackupId = findMostRecentBackupId(backupIds);
+ String mergedBackupId = BackupUtils.findMostRecentBackupId(backupIds);
TableName[] tableNames = getTableNamesInBackupImages(backupIds);
@@ -146,15 +146,34 @@ public class MapReduceBackupMergeJob implements BackupMergeJob {
table.updateProcessedTablesForMerge(tableList);
finishedTables = true;
- // Move data
+ // PHASE 2 (modification of a backup file system)
+ // Move existing mergedBackupId data into tmp directory
+ // we will need it later in case of a failure
+ Path tmpBackupDir = HBackupFileSystem.getBackupTmpDirPathForBackupId(backupRoot,
+ mergedBackupId);
+ Path backupDirPath = HBackupFileSystem.getBackupPath(backupRoot, mergedBackupId);
+
+ if (!fs.rename(backupDirPath, tmpBackupDir)) {
+ throw new IOException("Failed to rename "+ backupDirPath +" to "+tmpBackupDir);
+ } else {
+ LOG.debug("Renamed "+ backupDirPath +" to "+ tmpBackupDir);
+ }
+ // Move new data into backup dest
for (Pair<TableName, Path> tn : processedTableList) {
moveData(fs, backupRoot, tn.getSecond(), tn.getFirst(), mergedBackupId);
}
-
- // Delete old data and update manifest
+ // Update backup manifest
List<String> backupsToDelete = getBackupIdsToDelete(backupIds, mergedBackupId);
+ updateBackupManifest(tmpBackupDir.getParent().toString(), mergedBackupId, backupsToDelete);
+ // Copy meta files back from tmp to backup dir
+ copyMetaData(fs, tmpBackupDir, backupDirPath);
+ // Delete tmp dir (Rename back during repair)
+ if (!fs.delete(tmpBackupDir, true)) {
+ // WARN and ignore
+ LOG.warn("Could not delete tmp dir: "+ tmpBackupDir);
+ }
+ // Delete old data
deleteBackupImages(backupsToDelete, conn, fs, backupRoot);
- updateBackupManifest(backupRoot, mergedBackupId, backupsToDelete);
// Finish merge session
table.finishMergeOperation();
// Release lock
@@ -183,6 +202,80 @@ public class MapReduceBackupMergeJob implements BackupMergeJob {
}
}
+ /**
+ * Copy meta data to of a backup session
+ * @param fs file system
+ * @param tmpBackupDir temp backup directory, where meta is locaed
+ * @param backupDirPath new path for backup
+ * @throws IOException exception
+ */
+ protected void copyMetaData(FileSystem fs, Path tmpBackupDir, Path backupDirPath)
+ throws IOException {
+ RemoteIterator<LocatedFileStatus> it = fs.listFiles(tmpBackupDir, true);
+ List<Path> toKeep = new ArrayList<Path>();
+ while (it.hasNext()) {
+ Path p = it.next().getPath();
+ if (fs.isDirectory(p)) {
+ continue;
+ }
+ // Keep meta
+ String fileName = p.toString();
+ if (fileName.indexOf(FSTableDescriptors.TABLEINFO_DIR) > 0
+ || fileName.indexOf(HRegionFileSystem.REGION_INFO_FILE) > 0) {
+ toKeep.add(p);
+ }
+ }
+ // Copy meta to destination
+ for (Path p : toKeep) {
+ Path newPath = convertToDest(p, backupDirPath);
+ copyFile(fs, p, newPath);
+ }
+ }
+
+ /**
+ * Copy file in DFS from p to newPath
+ * @param fs file system
+ * @param p old path
+ * @param newPath new path
+ * @throws IOException exception
+ */
+ protected void copyFile(FileSystem fs, Path p, Path newPath) throws IOException {
+ File f = File.createTempFile("data", "meta");
+ Path localPath = new Path(f.getAbsolutePath());
+ fs.copyToLocalFile(p, localPath);
+ fs.copyFromLocalFile(localPath, newPath);
+ boolean exists = fs.exists(newPath);
+ if (!exists) {
+ throw new IOException("Failed to copy meta file to: "+ newPath);
+ }
+ }
+
+/**
+ * Converts path before copying
+ * @param p path
+ * @param backupDirPath backup root
+ * @return converted path
+ */
+ protected Path convertToDest(Path p, Path backupDirPath) {
+ String backupId = backupDirPath.getName();
+ Stack<String> stack = new Stack<String>();
+ String name = null;
+ while (true) {
+ name = p.getName();
+ if (!name.equals(backupId)) {
+ stack.push(name);
+ p = p.getParent();
+ } else {
+ break;
+ }
+ }
+ Path newPath = new Path(backupDirPath.toString());
+ while (!stack.isEmpty()) {
+ newPath = new Path(newPath, stack.pop());
+ }
+ return newPath;
+ }
+
protected List<Path> toPathList(List<Pair<TableName, Path>> processedTableList) {
ArrayList<Path> list = new ArrayList<>();
for (Pair<TableName, Path> p : processedTableList) {
@@ -251,11 +344,6 @@ public class MapReduceBackupMergeJob implements BackupMergeJob {
Path dest =
new Path(HBackupFileSystem.getTableBackupDir(backupRoot, mergedBackupId, tableName));
- // Delete all *data* files in dest
- if (!deleteData(fs, dest)) {
- throw new IOException("Could not delete " + dest);
- }
-
FileStatus[] fsts = fs.listStatus(bulkOutputPath);
for (FileStatus fst : fsts) {
if (fst.isDirectory()) {
@@ -265,54 +353,13 @@ public class MapReduceBackupMergeJob implements BackupMergeJob {
if (!fs.delete(newDst, true)) {
throw new IOException("failed to delete :"+ newDst);
}
+ } else {
+ fs.mkdirs(dest);
}
- fs.rename(fst.getPath(), dest);
- }
- }
- }
-
- /**
- * Deletes only data files and keeps all META
- * @param fs file system instance
- * @param dest destination location
- * @return true, if success, false - otherwise
- * @throws FileNotFoundException exception
- * @throws IOException exception
- */
- private boolean deleteData(FileSystem fs, Path dest) throws FileNotFoundException, IOException {
- RemoteIterator<LocatedFileStatus> it = fs.listFiles(dest, true);
- List<Path> toDelete = new ArrayList<Path>();
- while (it.hasNext()) {
- Path p = it.next().getPath();
- if (fs.isDirectory(p)) {
- continue;
- }
- // Keep meta
- String fileName = p.toString();
- if (fileName.indexOf(FSTableDescriptors.TABLEINFO_DIR) > 0 ||
- fileName.indexOf(HRegionFileSystem.REGION_INFO_FILE) > 0) {
- continue;
- }
- toDelete.add(p);
- }
- for (Path p : toDelete) {
- boolean result = fs.delete(p, false);
- if (!result) {
- return false;
- }
- }
- return true;
- }
-
- protected String findMostRecentBackupId(String[] backupIds) {
- long recentTimestamp = Long.MIN_VALUE;
- for (String backupId : backupIds) {
- long ts = Long.parseLong(backupId.split("_")[1]);
- if (ts > recentTimestamp) {
- recentTimestamp = ts;
+ boolean result = fs.rename(fst.getPath(), dest);
+ LOG.debug("MoveData from "+ fst.getPath() +" to "+ dest+" result="+ result);
}
}
- return BackupRestoreConstants.BACKUPID_PREFIX + recentTimestamp;
}
protected TableName[] getTableNamesInBackupImages(String[] backupIds) throws IOException {
http://git-wip-us.apache.org/repos/asf/hbase/blob/d5aaeee8/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
index 18548f5..96ecab9 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
@@ -740,4 +740,16 @@ public final class BackupUtils {
}
return loader;
}
+
+ public static String findMostRecentBackupId(String[] backupIds) {
+ long recentTimestamp = Long.MIN_VALUE;
+ for (String backupId : backupIds) {
+ long ts = Long.parseLong(backupId.split("_")[1]);
+ if (ts > recentTimestamp) {
+ recentTimestamp = ts;
+ }
+ }
+ return BackupRestoreConstants.BACKUPID_PREFIX + recentTimestamp;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d5aaeee8/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupMerge.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupMerge.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupMerge.java
new file mode 100644
index 0000000..8ead548
--- /dev/null
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupMerge.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category(LargeTests.class)
+public class TestBackupMerge extends TestBackupBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestBackupMerge.class);
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestBackupMerge.class);
+
+
+
+ @Test
+ public void TestIncBackupMergeRestore() throws Exception {
+ int ADD_ROWS = 99;
+ // #1 - create full backup for all tables
+ LOG.info("create full backup image for all tables");
+
+ List<TableName> tables = Lists.newArrayList(table1, table2);
+ // Set custom Merge Job implementation
+
+
+ Connection conn = ConnectionFactory.createConnection(conf1);
+
+ HBaseAdmin admin = (HBaseAdmin) conn.getAdmin();
+ BackupAdminImpl client = new BackupAdminImpl(conn);
+
+ BackupRequest request = createBackupRequest(BackupType.FULL, tables, BACKUP_ROOT_DIR);
+ String backupIdFull = client.backupTables(request);
+
+ assertTrue(checkSucceeded(backupIdFull));
+
+ // #2 - insert some data to table1
+ HTable t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
+ LOG.debug("writing " + ADD_ROWS + " rows to " + table1);
+
+ Assert.assertEquals(TEST_UTIL.countRows(t1), NB_ROWS_IN_BATCH + ADD_ROWS);
+ t1.close();
+ LOG.debug("written " + ADD_ROWS + " rows to " + table1);
+
+ HTable t2 = insertIntoTable(conn, table2, famName, 1, ADD_ROWS);
+
+ Assert.assertEquals(TEST_UTIL.countRows(t2), NB_ROWS_IN_BATCH + ADD_ROWS);
+ t2.close();
+ LOG.debug("written " + ADD_ROWS + " rows to " + table2);
+
+ // #3 - incremental backup for multiple tables
+ tables = Lists.newArrayList(table1, table2);
+ request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR);
+ String backupIdIncMultiple = client.backupTables(request);
+
+ assertTrue(checkSucceeded(backupIdIncMultiple));
+
+ t1 = insertIntoTable(conn, table1, famName, 2, ADD_ROWS);
+ t1.close();
+
+ t2 = insertIntoTable(conn, table2, famName, 2, ADD_ROWS);
+ t2.close();
+
+ // #3 - incremental backup for multiple tables
+ request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR);
+ String backupIdIncMultiple2 = client.backupTables(request);
+ assertTrue(checkSucceeded(backupIdIncMultiple2));
+
+ try (BackupAdmin bAdmin = new BackupAdminImpl(conn)) {
+ String[] backups = new String[] { backupIdIncMultiple, backupIdIncMultiple2 };
+ bAdmin.mergeBackups(backups);
+ }
+
+ // #6 - restore incremental backup for multiple tables, with overwrite
+ TableName[] tablesRestoreIncMultiple = new TableName[] { table1, table2 };
+ TableName[] tablesMapIncMultiple = new TableName[] { table1_restore, table2_restore };
+ client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple2, false,
+ tablesRestoreIncMultiple, tablesMapIncMultiple, true));
+
+ Table hTable = conn.getTable(table1_restore);
+ LOG.debug("After incremental restore: " + hTable.getTableDescriptor());
+ int countRows = TEST_UTIL.countRows(hTable, famName);
+ LOG.debug("f1 has " + countRows + " rows");
+ Assert.assertEquals(NB_ROWS_IN_BATCH + 2 * ADD_ROWS, countRows);
+
+ hTable.close();
+
+ hTable = conn.getTable(table2_restore);
+ Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH + 2 * ADD_ROWS);
+ hTable.close();
+
+ admin.close();
+ conn.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d5aaeee8/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java
index 7ce5050..57bdc46 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
+import org.apache.hadoop.hbase.backup.impl.BackupCommands;
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupMergeJob;
import org.apache.hadoop.hbase.backup.mapreduce.MapReduceHFileSplitterJob;
@@ -113,7 +114,7 @@ public class TestIncrementalBackupMergeWithFailures extends TestBackupBase {
table.startMergeOperation(backupIds);
// Select most recent backup id
- String mergedBackupId = findMostRecentBackupId(backupIds);
+ String mergedBackupId = BackupUtils.findMostRecentBackupId(backupIds);
TableName[] tableNames = getTableNamesInBackupImages(backupIds);
@@ -160,18 +161,38 @@ public class TestIncrementalBackupMergeWithFailures extends TestBackupBase {
table.updateProcessedTablesForMerge(tableList);
finishedTables = true;
- // Move data
+ // (modification of a backup file system)
+ // Move existing mergedBackupId data into tmp directory
+ // we will need it later in case of a failure
+ Path tmpBackupDir = HBackupFileSystem.getBackupTmpDirPathForBackupId(backupRoot,
+ mergedBackupId);
+ Path backupDirPath = HBackupFileSystem.getBackupPath(backupRoot, mergedBackupId);
+ if (!fs.rename(backupDirPath, tmpBackupDir)) {
+ throw new IOException("Failed to rename "+ backupDirPath +" to "+tmpBackupDir);
+ } else {
+ LOG.debug("Renamed "+ backupDirPath +" to "+ tmpBackupDir);
+ }
+ // Move new data into backup dest
for (Pair<TableName, Path> tn : processedTableList) {
moveData(fs, backupRoot, tn.getSecond(), tn.getFirst(), mergedBackupId);
}
- // PHASE 4
checkFailure(FailurePhase.PHASE4);
- // Delete old data and update manifest
+ // Update backup manifest
List<String> backupsToDelete = getBackupIdsToDelete(backupIds, mergedBackupId);
+ updateBackupManifest(tmpBackupDir.getParent().toString(), mergedBackupId, backupsToDelete);
+ // Copy meta files back from tmp to backup dir
+ copyMetaData(fs, tmpBackupDir, backupDirPath);
+ // Delete tmp dir (Rename back during repair)
+ if (!fs.delete(tmpBackupDir, true)) {
+ // WARN and ignore
+ LOG.warn("Could not delete tmp dir: "+ tmpBackupDir);
+ }
+ // Delete old data
deleteBackupImages(backupsToDelete, conn, fs, backupRoot);
- updateBackupManifest(backupRoot, mergedBackupId, backupsToDelete);
// Finish merge session
table.finishMergeOperation();
+ // Release lock
+ table.finishBackupExclusiveOperation();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
@@ -285,8 +306,8 @@ public class TestIncrementalBackupMergeWithFailures extends TestBackupBase {
Assert.fail("IOException is expected");
} catch(IOException ee) {
// Expected - clean up before proceeding
- table.finishMergeOperation();
- table.finishBackupExclusiveOperation();
+ //table.finishMergeOperation();
+ //table.finishBackupExclusiveOperation();
}
}
table.close();
@@ -297,7 +318,10 @@ public class TestIncrementalBackupMergeWithFailures extends TestBackupBase {
Configuration conf = conn.getConfiguration();
conf.unset(FAILURE_PHASE_KEY);
conf.unset(BackupRestoreFactory.HBASE_BACKUP_MERGE_IMPL_CLASS);
-
+ // Now run repair
+ BackupSystemTable sysTable = new BackupSystemTable(conn);
+ BackupCommands.RepairCommand.repairFailedBackupMergeIfAny(conn, sysTable);
+ // Now repeat merge
try (BackupAdmin bAdmin = new BackupAdminImpl(conn)) {
String[] backups = new String[] { backupIdIncMultiple, backupIdIncMultiple2 };
bAdmin.mergeBackups(backups);