You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by el...@apache.org on 2017/08/14 01:50:25 UTC
[4/4] hbase git commit: HBASE-14135 Merge backup images (Vladimir
Rodionov)
HBASE-14135 Merge backup images (Vladimir Rodionov)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/35aa7aae
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/35aa7aae
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/35aa7aae
Branch: refs/heads/branch-2
Commit: 35aa7aae3a0d269d809416f6ff24599517f5b44b
Parents: b4d4446
Author: Josh Elser <el...@apache.org>
Authored: Sun Aug 13 20:55:58 2017 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Sun Aug 13 21:16:39 2017 -0400
----------------------------------------------------------------------
.../apache/hadoop/hbase/backup/BackupAdmin.java | 20 +-
.../hadoop/hbase/backup/BackupDriver.java | 2 +
.../apache/hadoop/hbase/backup/BackupInfo.java | 5 +
.../hadoop/hbase/backup/BackupMergeJob.java | 40 +++
.../hbase/backup/BackupRestoreFactory.java | 20 +-
.../hadoop/hbase/backup/HBackupFileSystem.java | 57 ++--
.../hbase/backup/impl/BackupAdminImpl.java | 213 +++++++++---
.../hbase/backup/impl/BackupCommands.java | 163 ++++++---
.../hadoop/hbase/backup/impl/BackupManager.java | 21 +-
.../hbase/backup/impl/BackupManifest.java | 24 +-
.../hbase/backup/impl/BackupSystemTable.java | 314 ++++++++++-------
.../hbase/backup/impl/RestoreTablesClient.java | 32 +-
.../backup/mapreduce/HFileSplitterJob.java | 181 ----------
.../mapreduce/MapReduceBackupMergeJob.java | 321 ++++++++++++++++++
.../mapreduce/MapReduceHFileSplitterJob.java | 181 ++++++++++
.../backup/mapreduce/MapReduceRestoreJob.java | 84 ++---
.../hadoop/hbase/backup/util/BackupUtils.java | 93 +++--
.../TestIncrementalBackupMergeWithFailures.java | 336 +++++++++++++++++++
.../backup/TestRepairAfterFailedDelete.java | 2 +-
19 files changed, 1574 insertions(+), 535 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/35aa7aae/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java
index 6f642a4..9dc6382 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java
@@ -38,8 +38,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
public interface BackupAdmin extends Closeable {
/**
- * Backup given list of tables fully. This is a synchronous operation.
- * It returns backup id on success or throw exception on failure.
+ * Backup given list of tables fully. This is a synchronous operation. It returns backup id on
+ * success or throw exception on failure.
* @param userRequest BackupRequest instance
* @return the backup Id
*/
@@ -61,16 +61,24 @@ public interface BackupAdmin extends Closeable {
*/
BackupInfo getBackupInfo(String backupId) throws IOException;
-
/**
* Delete backup image command
- * @param backupIds backup id list
+ * @param backupIds array of backup ids
* @return total number of deleted sessions
* @throws IOException exception
*/
int deleteBackups(String[] backupIds) throws IOException;
/**
+ * Merge backup images command
+ * @param backupIds array of backup ids of images to be merged
+ * The resulting backup image will have the same backup id as the most
+ * recent image from a list of images to be merged
+ * @throws IOException exception
+ */
+ void mergeBackups(String[] backupIds) throws IOException;
+
+ /**
* Show backup history command
* @param n last n backup sessions
* @return list of backup info objects
@@ -113,7 +121,7 @@ public interface BackupAdmin extends Closeable {
/**
* Add tables to backup set command
* @param name name of backup set.
- * @param tables list of tables to be added to this set.
+ * @param tables array of tables to be added to this set.
* @throws IOException exception
*/
void addToBackupSet(String name, TableName[] tables) throws IOException;
@@ -121,7 +129,7 @@ public interface BackupAdmin extends Closeable {
/**
* Remove tables from backup set
* @param name name of backup set.
- * @param tables list of tables to be removed from this set.
+ * @param tables array of tables to be removed from this set.
* @throws IOException exception
*/
void removeFromBackupSet(String name, TableName[] tables) throws IOException;
http://git-wip-us.apache.org/repos/asf/hbase/blob/35aa7aae/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java
index e2cdb2f..9dd8531 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java
@@ -111,6 +111,8 @@ public class BackupDriver extends AbstractHBaseTool {
type = BackupCommand.SET;
} else if (BackupCommand.REPAIR.name().equalsIgnoreCase(cmd)) {
type = BackupCommand.REPAIR;
+ } else if (BackupCommand.MERGE.name().equalsIgnoreCase(cmd)) {
+ type = BackupCommand.MERGE;
} else {
System.out.println("Unsupported command for backup: " + cmd);
printToolUsage();
http://git-wip-us.apache.org/repos/asf/hbase/blob/35aa7aae/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java
index f6a1fe4..1765bf3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java
@@ -433,6 +433,11 @@ public class BackupInfo implements Comparable<BackupInfo> {
}
}
+ @Override
+ public String toString() {
+ return backupId;
+ }
+
public byte[] toByteArray() throws IOException {
return toProtosBackupInfo().toByteArray();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/35aa7aae/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupMergeJob.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupMergeJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupMergeJob.java
new file mode 100644
index 0000000..136782f
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupMergeJob.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Backup merge operation job interface. Concrete implementation is provided by backup provider, see
+ * {@link BackupRestoreFactory}
+ */
+
+@InterfaceAudience.Private
+public interface BackupMergeJob extends Configurable {
+
+ /**
+ * Run backup merge operation
+ * @param backupIds backup image ids
+ * @throws IOException
+ */
+ void run(String[] backupIds) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/35aa7aae/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreFactory.java
index 6d8967a..d72c884 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreFactory.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.backup;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupCopyJob;
+import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupMergeJob;
import org.apache.hadoop.hbase.backup.mapreduce.MapReduceRestoreJob;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.util.ReflectionUtils;
@@ -32,6 +33,7 @@ public final class BackupRestoreFactory {
public final static String HBASE_INCR_RESTORE_IMPL_CLASS = "hbase.incremental.restore.class";
public final static String HBASE_BACKUP_COPY_IMPL_CLASS = "hbase.backup.copy.class";
+ public final static String HBASE_BACKUP_MERGE_IMPL_CLASS = "hbase.backup.merge.class";
private BackupRestoreFactory() {
throw new AssertionError("Instantiating utility class...");
@@ -40,7 +42,7 @@ public final class BackupRestoreFactory {
/**
* Gets backup restore job
* @param conf configuration
- * @return backup restore task instance
+ * @return backup restore job instance
*/
public static RestoreJob getRestoreJob(Configuration conf) {
Class<? extends RestoreJob> cls =
@@ -53,7 +55,7 @@ public final class BackupRestoreFactory {
/**
* Gets backup copy job
* @param conf configuration
- * @return backup copy task
+ * @return backup copy job instance
*/
public static BackupCopyJob getBackupCopyJob(Configuration conf) {
Class<? extends BackupCopyJob> cls =
@@ -63,4 +65,18 @@ public final class BackupRestoreFactory {
service.setConf(conf);
return service;
}
+
+ /**
+ * Gets backup merge job
+ * @param conf configuration
+ * @return backup merge job instance
+ */
+ public static BackupMergeJob getBackupMergeJob(Configuration conf) {
+ Class<? extends BackupMergeJob> cls =
+ conf.getClass(HBASE_BACKUP_MERGE_IMPL_CLASS, MapReduceBackupMergeJob.class,
+ BackupMergeJob.class);
+ BackupMergeJob service = ReflectionUtils.newInstance(cls, conf);
+ service.setConf(conf);
+ return service;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/35aa7aae/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java
index 46044db..1c43e88 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java
@@ -49,8 +49,8 @@ public class HBackupFileSystem {
/**
* Given the backup root dir, backup id and the table name, return the backup image location,
* which is also where the backup manifest file is. return value look like:
- * "hdfs://backup.hbase.org:9000/user/biadmin/backup/backup_1396650096738/default/t1_dn/",
- * where "hdfs://backup.hbase.org:9000/user/biadmin/backup" is a backup root directory
+ * "hdfs://backup.hbase.org:9000/user/biadmin/backup/backup_1396650096738/default/t1_dn/", where
+ * "hdfs://backup.hbase.org:9000/user/biadmin/backup" is a backup root directory
* @param backupRootDir backup root directory
* @param backupId backup id
* @param tableName table name
@@ -63,18 +63,26 @@ public class HBackupFileSystem {
+ Path.SEPARATOR;
}
+ public static String getTableBackupDataDir(String backupRootDir, String backupId,
+ TableName tableName) {
+ return getTableBackupDir(backupRootDir, backupId, tableName) + Path.SEPARATOR + "data";
+ }
+
+ public static Path getBackupPath(String backupRootDir, String backupId) {
+ return new Path(backupRootDir + Path.SEPARATOR + backupId);
+ }
+
/**
* Given the backup root dir, backup id and the table name, return the backup image location,
* which is also where the backup manifest file is. return value look like:
- * "hdfs://backup.hbase.org:9000/user/biadmin/backup/backup_1396650096738/default/t1_dn/",
- * where "hdfs://backup.hbase.org:9000/user/biadmin/backup" is a backup root directory
+ * "hdfs://backup.hbase.org:9000/user/biadmin/backup/backup_1396650096738/default/t1_dn/", where
+ * "hdfs://backup.hbase.org:9000/user/biadmin/backup" is a backup root directory
* @param backupRootPath backup root path
* @param tableName table name
* @param backupId backup Id
* @return backupPath for the particular table
*/
- public static Path getTableBackupPath(TableName tableName,
- Path backupRootPath, String backupId) {
+ public static Path getTableBackupPath(TableName tableName, Path backupRootPath, String backupId) {
return new Path(getTableBackupDir(backupRootPath.toString(), backupId, tableName));
}
@@ -94,33 +102,30 @@ public class HBackupFileSystem {
return new Path(getLogBackupDir(backupRootDir, backupId));
}
- private static Path getManifestPath(TableName tableName, Configuration conf, Path backupRootPath,
- String backupId) throws IOException {
- Path manifestPath =
- new Path(getTableBackupPath(tableName, backupRootPath, backupId),
- BackupManifest.MANIFEST_FILE_NAME);
+ // TODO we do not keep WAL files anymore
+ // Move manifest file to other place
+ private static Path getManifestPath(Configuration conf, Path backupRootPath, String backupId)
+ throws IOException {
+ Path manifestPath = null;
FileSystem fs = backupRootPath.getFileSystem(conf);
+ manifestPath =
+ new Path(getBackupPath(backupRootPath.toString(), backupId) + Path.SEPARATOR
+ + BackupManifest.MANIFEST_FILE_NAME);
if (!fs.exists(manifestPath)) {
- // check log dir for incremental backup case
- manifestPath =
- new Path(getLogBackupDir(backupRootPath.toString(), backupId) + Path.SEPARATOR
- + BackupManifest.MANIFEST_FILE_NAME);
- if (!fs.exists(manifestPath)) {
- String errorMsg =
- "Could not find backup manifest " + BackupManifest.MANIFEST_FILE_NAME + " for "
- + backupId + ". File " + manifestPath + " does not exists. Did " + backupId
- + " correspond to previously taken backup ?";
- throw new IOException(errorMsg);
- }
+ String errorMsg =
+ "Could not find backup manifest " + BackupManifest.MANIFEST_FILE_NAME + " for "
+ + backupId + ". File " + manifestPath + " does not exists. Did " + backupId
+ + " correspond to previously taken backup ?";
+ throw new IOException(errorMsg);
}
return manifestPath;
}
- public static BackupManifest getManifest(TableName tableName, Configuration conf,
- Path backupRootPath, String backupId) throws IOException {
+ public static BackupManifest
+ getManifest(Configuration conf, Path backupRootPath, String backupId) throws IOException {
BackupManifest manifest =
- new BackupManifest(conf, getManifestPath(tableName, conf, backupRootPath, backupId));
+ new BackupManifest(conf, getManifestPath(conf, backupRootPath, backupId));
return manifest;
}
@@ -134,7 +139,7 @@ public class HBackupFileSystem {
TableName[] tableArray, Configuration conf, Path backupRootPath, String backupId)
throws IOException {
for (TableName tableName : tableArray) {
- BackupManifest manifest = getManifest(tableName, conf, backupRootPath, backupId);
+ BackupManifest manifest = getManifest(conf, backupRootPath, backupId);
backupManifestMap.put(tableName, manifest);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/35aa7aae/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
index 6e35d92..99fb06c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.backup.impl;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -36,8 +37,10 @@ import org.apache.hadoop.hbase.backup.BackupAdmin;
import org.apache.hadoop.hbase.backup.BackupClientFactory;
import org.apache.hadoop.hbase.backup.BackupInfo;
import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
+import org.apache.hadoop.hbase.backup.BackupMergeJob;
import org.apache.hadoop.hbase.backup.BackupRequest;
import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
+import org.apache.hadoop.hbase.backup.BackupRestoreFactory;
import org.apache.hadoop.hbase.backup.BackupType;
import org.apache.hadoop.hbase.backup.HBackupFileSystem;
import org.apache.hadoop.hbase.backup.RestoreRequest;
@@ -46,9 +49,8 @@ import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@InterfaceAudience.Private
public class BackupAdminImpl implements BackupAdmin {
@@ -65,12 +67,8 @@ public class BackupAdminImpl implements BackupAdmin {
@Override
public void close() throws IOException {
- if (conn != null) {
- conn.close();
- }
}
-
@Override
public BackupInfo getBackupInfo(String backupId) throws IOException {
BackupInfo backupInfo = null;
@@ -105,12 +103,12 @@ public class BackupAdminImpl implements BackupAdmin {
// is running by using startBackupSession API
// If there is an active session in progress, exception will be thrown
try {
- sysTable.startBackupSession();
+ sysTable.startBackupExclusiveOperation();
deleteSessionStarted = true;
} catch (IOException e) {
LOG.warn("You can not run delete command while active backup session is in progress. \n"
+ "If there is no active backup session running, run backup repair utility to restore \n"
- +"backup system integrity.");
+ + "backup system integrity.");
return -1;
}
@@ -126,7 +124,7 @@ public class BackupAdminImpl implements BackupAdmin {
sysTable.startDeleteOperation(backupIds);
// Step 4: Snapshot backup system table
if (!BackupSystemTable.snapshotExists(conn)) {
- BackupSystemTable.snapshot(conn);
+ BackupSystemTable.snapshot(conn);
} else {
LOG.warn("Backup system table snapshot exists");
}
@@ -154,13 +152,13 @@ public class BackupAdminImpl implements BackupAdmin {
// Fail delete operation
// Step 1
if (snapshotDone) {
- if(BackupSystemTable.snapshotExists(conn)) {
+ if (BackupSystemTable.snapshotExists(conn)) {
BackupSystemTable.restoreFromSnapshot(conn);
// delete snapshot
BackupSystemTable.deleteSnapshot(conn);
// We still have record with unfinished delete operation
- LOG.error("Delete operation failed, please run backup repair utility to restore "+
- "backup system integrity", e);
+ LOG.error("Delete operation failed, please run backup repair utility to restore "
+ + "backup system integrity", e);
throw e;
} else {
LOG.warn("Delete operation succeeded, there were some errors: ", e);
@@ -169,7 +167,7 @@ public class BackupAdminImpl implements BackupAdmin {
} finally {
if (deleteSessionStarted) {
- sysTable.finishBackupSession();
+ sysTable.finishBackupExclusiveOperation();
}
}
}
@@ -206,17 +204,17 @@ public class BackupAdminImpl implements BackupAdmin {
/**
* Delete single backup and all related backups <br>
* Algorithm:<br>
- * Backup type: FULL or INCREMENTAL <br>
- * Is this last backup session for table T: YES or NO <br>
- * For every table T from table list 'tables':<br>
- * if(FULL, YES) deletes only physical data (PD) <br>
- * if(FULL, NO), deletes PD, scans all newer backups and removes T from backupInfo,<br>
- * until we either reach the most recent backup for T in the system or FULL backup<br>
- * which includes T<br>
- * if(INCREMENTAL, YES) deletes only physical data (PD)
- * if(INCREMENTAL, NO) deletes physical data and for table T scans all backup images between last<br>
- * FULL backup, which is older than the backup being deleted and the next FULL backup (if exists) <br>
- * or last one for a particular table T and removes T from list of backup tables.
+ * Backup type: FULL or INCREMENTAL <br>
+ * Is this last backup session for table T: YES or NO <br>
+ * For every table T from table list 'tables':<br>
+ * if(FULL, YES) deletes only physical data (PD) <br>
+ * if(FULL, NO), deletes PD, scans all newer backups and removes T from backupInfo,<br>
+ * until we either reach the most recent backup for T in the system or FULL backup<br>
+ * which includes T<br>
+ * if(INCREMENTAL, YES) deletes only physical data (PD) if(INCREMENTAL, NO) deletes physical data
+ * and for table T scans all backup images between last<br>
+ * FULL backup, which is older than the backup being deleted and the next FULL backup (if exists) <br>
+ * or last one for a particular table T and removes T from list of backup tables.
* @param backupId backup id
* @param sysTable backup system table
* @return total number of deleted backup images
@@ -285,8 +283,9 @@ public class BackupAdminImpl implements BackupAdmin {
return totalDeleted;
}
- private void removeTableFromBackupImage(BackupInfo info, TableName tn,
- BackupSystemTable sysTable) throws IOException {
+ private void
+ removeTableFromBackupImage(BackupInfo info, TableName tn, BackupSystemTable sysTable)
+ throws IOException {
List<TableName> tables = info.getTableNames();
LOG.debug("Remove " + tn + " from " + info.getBackupId() + " tables="
+ info.getTableListAsString());
@@ -485,7 +484,7 @@ public class BackupAdminImpl implements BackupAdmin {
private String[] toStringArray(TableName[] list) {
String[] arr = new String[list.length];
- for(int i=0; i < list.length; i++) {
+ for (int i = 0; i < list.length; i++) {
arr[i] = list[i].toString();
}
return arr;
@@ -521,7 +520,7 @@ public class BackupAdminImpl implements BackupAdmin {
String targetRootDir = request.getTargetRootDir();
List<TableName> tableList = request.getTableList();
- String backupId =BackupRestoreConstants.BACKUPID_PREFIX + EnvironmentEdgeManager.currentTime();
+ String backupId = BackupRestoreConstants.BACKUPID_PREFIX + EnvironmentEdgeManager.currentTime();
if (type == BackupType.INCREMENTAL) {
Set<TableName> incrTableSet = null;
try (BackupSystemTable table = new BackupSystemTable(conn)) {
@@ -529,19 +528,20 @@ public class BackupAdminImpl implements BackupAdmin {
}
if (incrTableSet.isEmpty()) {
- String msg = "Incremental backup table set contains no tables. "
- + "You need to run full backup first " +
- (tableList != null ? "on "+StringUtils.join(tableList, ","): "");
+ String msg =
+ "Incremental backup table set contains no tables. "
+ + "You need to run full backup first "
+ + (tableList != null ? "on " + StringUtils.join(tableList, ",") : "");
throw new IOException(msg);
}
- if(tableList != null) {
+ if (tableList != null) {
tableList.removeAll(incrTableSet);
if (!tableList.isEmpty()) {
String extraTables = StringUtils.join(tableList, ",");
- String msg = "Some tables (" + extraTables + ") haven't gone through full backup. "+
- "Perform full backup on " + extraTables + " first, "
- + "then retry the command";
+ String msg =
+ "Some tables (" + extraTables + ") haven't gone through full backup. "
+ + "Perform full backup on " + extraTables + " first, " + "then retry the command";
throw new IOException(msg);
}
}
@@ -584,14 +584,13 @@ public class BackupAdminImpl implements BackupAdmin {
// update table list
BackupRequest.Builder builder = new BackupRequest.Builder();
- request = builder.withBackupType(request.getBackupType()).
- withTableList(tableList).
- withTargetRootDir(request.getTargetRootDir()).
- withBackupSetName(request.getBackupSetName()).
- withTotalTasks(request.getTotalTasks()).
- withBandwidthPerTasks((int)request.getBandwidth()).build();
-
- TableBackupClient client =null;
+ request =
+ builder.withBackupType(request.getBackupType()).withTableList(tableList)
+ .withTargetRootDir(request.getTargetRootDir())
+ .withBackupSetName(request.getBackupSetName()).withTotalTasks(request.getTotalTasks())
+ .withBandwidthPerTasks((int) request.getBandwidth()).build();
+
+ TableBackupClient client = null;
try {
client = BackupClientFactory.create(conn, backupId, request);
} catch (IOException e) {
@@ -613,4 +612,132 @@ public class BackupAdminImpl implements BackupAdmin {
return tableList;
}
+ @Override
+ public void mergeBackups(String[] backupIds) throws IOException {
+ try (final BackupSystemTable sysTable = new BackupSystemTable(conn);) {
+ checkIfValidForMerge(backupIds, sysTable);
+ BackupMergeJob job = BackupRestoreFactory.getBackupMergeJob(conn.getConfiguration());
+ job.run(backupIds);
+ }
+ }
+
+ /**
+ * Verifies that backup images are valid for merge.
+ *
+ * <ul>
+ * <li>All backups MUST be in the same destination
+ * <li>No FULL backups are allowed - only INCREMENTAL
+ * <li>All backups must be in COMPLETE state
+ * <li>No holes in backup list are allowed
+ * </ul>
+ * <p>
+ * @param backupIds list of backup ids
+ * @param table backup system table
+ * @throws IOException
+ */
+ private void checkIfValidForMerge(String[] backupIds, BackupSystemTable table) throws IOException {
+ String backupRoot = null;
+
+ final Set<TableName> allTables = new HashSet<TableName>();
+ final Set<String> allBackups = new HashSet<String>();
+ long minTime = Long.MAX_VALUE, maxTime = Long.MIN_VALUE;
+ for (String backupId : backupIds) {
+ BackupInfo bInfo = table.readBackupInfo(backupId);
+ if (bInfo == null) {
+ String msg = "Backup session " + backupId + " not found";
+ throw new IOException(msg);
+ }
+ if (backupRoot == null) {
+ backupRoot = bInfo.getBackupRootDir();
+ } else if (!bInfo.getBackupRootDir().equals(backupRoot)) {
+ throw new IOException("Found different backup destinations in a list of a backup sessions \n"
+ + "1. " + backupRoot + "\n" + "2. " + bInfo.getBackupRootDir());
+ }
+ if (bInfo.getType() == BackupType.FULL) {
+ throw new IOException("FULL backup image can not be merged for: \n" + bInfo);
+ }
+
+ if (bInfo.getState() != BackupState.COMPLETE) {
+ throw new IOException("Backup image " + backupId
+ + " can not be merged becuase of its state: " + bInfo.getState());
+ }
+ allBackups.add(backupId);
+ allTables.addAll(bInfo.getTableNames());
+ long time = bInfo.getStartTs();
+ if (time < minTime) {
+ minTime = time;
+ }
+ if (time > maxTime) {
+ maxTime = time;
+ }
+ }
+
+
+ final long startRangeTime = minTime;
+ final long endRangeTime = maxTime;
+ final String backupDest = backupRoot;
+ // Check we have no 'holes' in backup id list
+ // Filter 1 : backupRoot
+ // Filter 2 : time range filter
+ // Filter 3 : table filter
+
+ BackupInfo.Filter destinationFilter = new BackupInfo.Filter() {
+
+ @Override
+ public boolean apply(BackupInfo info) {
+ return info.getBackupRootDir().equals(backupDest);
+ }
+ };
+
+ BackupInfo.Filter timeRangeFilter = new BackupInfo.Filter() {
+
+ @Override
+ public boolean apply(BackupInfo info) {
+ long time = info.getStartTs();
+ return time >= startRangeTime && time <= endRangeTime ;
+ }
+ };
+
+ BackupInfo.Filter tableFilter = new BackupInfo.Filter() {
+
+ @Override
+ public boolean apply(BackupInfo info) {
+ List<TableName> tables = info.getTableNames();
+ return !Collections.disjoint(allTables, tables);
+ }
+ };
+
+ BackupInfo.Filter typeFilter = new BackupInfo.Filter() {
+
+ @Override
+ public boolean apply(BackupInfo info) {
+ return info.getType() == BackupType.INCREMENTAL;
+ }
+ };
+
+ BackupInfo.Filter stateFilter = new BackupInfo.Filter() {
+ @Override
+ public boolean apply(BackupInfo info) {
+ return info.getState() == BackupState.COMPLETE;
+ }
+ };
+
+ List<BackupInfo> allInfos =
+ table.getBackupHistory( -1, destinationFilter,
+ timeRangeFilter, tableFilter, typeFilter, stateFilter);
+ if (allInfos.size() != allBackups.size()) {
+ // Yes we have at least one hole in backup image sequence
+ List<String> missingIds = new ArrayList<String>();
+ for(BackupInfo info: allInfos) {
+ if(allBackups.contains(info.getBackupId())) {
+ continue;
+ }
+ missingIds.add(info.getBackupId());
+ }
+ String errMsg =
+ "Sequence of backup ids has 'holes'. The following backup images must be added:" +
+ org.apache.hadoop.util.StringUtils.join(",", missingIds);
+ throw new IOException(errMsg);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/35aa7aae/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
index aa15fba..650ba2e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
@@ -59,16 +59,15 @@ import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
* General backup commands, options and usage messages
*/
@InterfaceAudience.Private
-public final class BackupCommands {
+public final class BackupCommands {
public final static String INCORRECT_USAGE = "Incorrect usage";
@@ -79,7 +78,8 @@ public final class BackupCommands {
+ " history show history of all successful backups\n"
+ " progress show the progress of the latest backup request\n"
+ " set backup set management\n"
- + " repair repair backup system table"
+ + " repair repair backup system table\n"
+ + " merge merge backup images\n"
+ "Run \'hbase backup COMMAND -h\' to see help message for each command\n";
public static final String CREATE_CMD_USAGE =
@@ -109,17 +109,20 @@ public final class BackupCommands {
public static final String SET_CMD_USAGE = "Usage: hbase backup set COMMAND [name] [tables]\n"
+ " name Backup set name\n"
- + " tables Comma separated list of tables.\n"
- + "COMMAND is one of:\n" + " add add tables to a set, create a set if needed\n"
+ + " tables Comma separated list of tables.\n" + "COMMAND is one of:\n"
+ + " add add tables to a set, create a set if needed\n"
+ " remove remove tables from a set\n"
+ " list list all backup sets in the system\n"
+ " describe describe set\n" + " delete delete backup set\n";
+ public static final String MERGE_CMD_USAGE = "Usage: hbase backup merge [backup_ids]\n"
+ + " backup_ids Comma separated list of backup image ids.\n";
public static final String USAGE_FOOTER = "";
public static abstract class Command extends Configured {
CommandLine cmdline;
Connection conn;
+
Command(Configuration conf) {
if (conf == null) {
conf = HBaseConfiguration.create();
@@ -140,7 +143,7 @@ public final class BackupCommands {
try (BackupSystemTable table = new BackupSystemTable(conn);) {
List<BackupInfo> sessions = table.getBackupInfos(BackupState.RUNNING);
- if(sessions.size() > 0) {
+ if (sessions.size() > 0) {
System.err.println("Found backup session in a RUNNING state: ");
System.err.println(sessions.get(0));
System.err.println("This may indicate that a previous session has failed abnormally.");
@@ -154,11 +157,19 @@ public final class BackupCommands {
try (BackupSystemTable table = new BackupSystemTable(conn);) {
String[] ids = table.getListOfBackupIdsFromDeleteOperation();
- if(ids !=null && ids.length > 0) {
- System.err.println("Found failed backup delete coommand. ");
+ if (ids != null && ids.length > 0) {
+ System.err.println("Found failed backup DELETE coommand. ");
System.err.println("Backup system recovery is required.");
- throw new IOException("Failed backup delete found, aborted command execution");
+ throw new IOException("Failed backup DELETE found, aborted command execution");
}
+
+ ids = table.getListOfBackupIdsFromMergeOperation();
+ if (ids != null && ids.length > 0) {
+ System.err.println("Found failed backup MERGE coommand. ");
+ System.err.println("Backup system recovery is required.");
+ throw new IOException("Failed backup MERGE found, aborted command execution");
+ }
+
}
}
}
@@ -178,10 +189,10 @@ public final class BackupCommands {
protected boolean requiresNoActiveSession() {
return false;
}
+
/**
- * Command requires consistent state of a backup system
- * Backup system may become inconsistent because of an abnormal
- * termination of a backup session or delete command
+ * Command requires consistent state of a backup system Backup system may become inconsistent
+ * because of an abnormal termination of a backup session or delete command
* @return true, if yes
*/
protected boolean requiresConsistentState() {
@@ -220,6 +231,9 @@ public final class BackupCommands {
case REPAIR:
cmd = new RepairCommand(conf, cmdline);
break;
+ case MERGE:
+ cmd = new MergeCommand(conf, cmdline);
+ break;
case HELP:
default:
cmd = new HelpCommand(conf, cmdline);
@@ -257,7 +271,7 @@ public final class BackupCommands {
throw new IOException(INCORRECT_USAGE);
}
String[] args = cmdline.getArgs();
- if (args.length !=3) {
+ if (args.length != 3) {
printUsage();
throw new IOException(INCORRECT_USAGE);
}
@@ -274,7 +288,6 @@ public final class BackupCommands {
throw new IOException(INCORRECT_USAGE);
}
-
String tables = null;
// Check if we have both: backup set and list of tables
@@ -310,14 +323,14 @@ public final class BackupCommands {
try (BackupAdminImpl admin = new BackupAdminImpl(conn);) {
- BackupRequest.Builder builder = new BackupRequest.Builder();
- BackupRequest request = builder.withBackupType(BackupType.valueOf(args[1].toUpperCase()))
- .withTableList(tables != null ?
- Lists.newArrayList(BackupUtils.parseTableNames(tables)) : null)
- .withTargetRootDir(args[2])
- .withTotalTasks(workers)
- .withBandwidthPerTasks(bandwidth)
- .withBackupSetName(setName).build();
+ BackupRequest.Builder builder = new BackupRequest.Builder();
+ BackupRequest request =
+ builder
+ .withBackupType(BackupType.valueOf(args[1].toUpperCase()))
+ .withTableList(
+ tables != null ? Lists.newArrayList(BackupUtils.parseTableNames(tables)) : null)
+ .withTargetRootDir(args[2]).withTotalTasks(workers)
+ .withBandwidthPerTasks(bandwidth).withBackupSetName(setName).build();
String backupId = admin.backupTables(request);
System.out.println("Backup session " + backupId + " finished. Status: SUCCESS");
} catch (IOException e) {
@@ -544,7 +557,8 @@ public final class BackupCommands {
int deleted = admin.deleteBackups(backupIds);
System.out.println("Deleted " + deleted + " backups. Total requested: " + args.length);
} catch (IOException e) {
- System.err.println("Delete command FAILED. Please run backup repair tool to restore backup system integrity");
+ System.err
+ .println("Delete command FAILED. Please run backup repair tool to restore backup system integrity");
throw e;
}
@@ -584,8 +598,9 @@ public final class BackupCommands {
if (list.size() == 0) {
// No failed sessions found
System.out.println("REPAIR status: no failed sessions found."
- +" Checking failed delete backup operation ...");
+ + " Checking failed delete backup operation ...");
repairFailedBackupDeletionIfAny(conn, sysTable);
+ repairFailedBackupMergeIfAny(conn, sysTable);
return;
}
backupInfo = list.get(0);
@@ -606,32 +621,55 @@ public final class BackupCommands {
// If backup session is updated to FAILED state - means we
// processed recovery already.
sysTable.updateBackupInfo(backupInfo);
- sysTable.finishBackupSession();
- System.out.println("REPAIR status: finished repair failed session:\n "+ backupInfo);
+ sysTable.finishBackupExclusiveOperation();
+ System.out.println("REPAIR status: finished repair failed session:\n " + backupInfo);
}
}
private void repairFailedBackupDeletionIfAny(Connection conn, BackupSystemTable sysTable)
- throws IOException
- {
+ throws IOException {
String[] backupIds = sysTable.getListOfBackupIdsFromDeleteOperation();
- if (backupIds == null ||backupIds.length == 0) {
- System.out.println("No failed backup delete operation found");
+ if (backupIds == null || backupIds.length == 0) {
+ System.out.println("No failed backup DELETE operation found");
// Delete backup table snapshot if exists
BackupSystemTable.deleteSnapshot(conn);
return;
}
- System.out.println("Found failed delete operation for: " + StringUtils.join(backupIds));
- System.out.println("Running delete again ...");
+ System.out.println("Found failed DELETE operation for: " + StringUtils.join(backupIds));
+ System.out.println("Running DELETE again ...");
// Restore table from snapshot
BackupSystemTable.restoreFromSnapshot(conn);
// Finish previous failed session
- sysTable.finishBackupSession();
- try(BackupAdmin admin = new BackupAdminImpl(conn);) {
+ sysTable.finishBackupExclusiveOperation();
+ try (BackupAdmin admin = new BackupAdminImpl(conn);) {
admin.deleteBackups(backupIds);
}
- System.out.println("Delete operation finished OK: "+ StringUtils.join(backupIds));
+ System.out.println("DELETE operation finished OK: " + StringUtils.join(backupIds));
+
+ }
+
+ private void repairFailedBackupMergeIfAny(Connection conn, BackupSystemTable sysTable)
+ throws IOException {
+ String[] backupIds = sysTable.getListOfBackupIdsFromMergeOperation();
+ if (backupIds == null || backupIds.length == 0) {
+ System.out.println("No failed backup MERGE operation found");
+ // Delete backup table snapshot if exists
+ BackupSystemTable.deleteSnapshot(conn);
+ return;
+ }
+ System.out.println("Found failed MERGE operation for: " + StringUtils.join(backupIds));
+ System.out.println("Running MERGE again ...");
+ // Restore table from snapshot
+ BackupSystemTable.restoreFromSnapshot(conn);
+ // Unlock backupo system
+ sysTable.finishBackupExclusiveOperation();
+ // Finish previous failed session
+ sysTable.finishMergeOperation();
+ try (BackupAdmin admin = new BackupAdminImpl(conn);) {
+ admin.mergeBackups(backupIds);
+ }
+ System.out.println("MERGE operation finished OK: " + StringUtils.join(backupIds));
}
@@ -641,6 +679,56 @@ public final class BackupCommands {
}
}
+ private static class MergeCommand extends Command {
+
+ MergeCommand(Configuration conf, CommandLine cmdline) {
+ super(conf);
+ this.cmdline = cmdline;
+ }
+
+ @Override
+ protected boolean requiresNoActiveSession() {
+ return true;
+ }
+
+ @Override
+ protected boolean requiresConsistentState() {
+ return true;
+ }
+
+ @Override
+ public void execute() throws IOException {
+ super.execute();
+
+ String[] args = cmdline == null ? null : cmdline.getArgs();
+ if (args == null || (args.length != 2)) {
+ System.err.println("ERROR: wrong number of arguments: "
+ + (args == null ? null : args.length));
+ printUsage();
+ throw new IOException(INCORRECT_USAGE);
+ }
+
+ String[] backupIds = args[1].split(",");
+ if (backupIds.length < 2) {
+ String msg = "ERROR: can not merge a single backup image. "+
+ "Number of images must be greater than 1.";
+ System.err.println(msg);
+ throw new IOException(msg);
+
+ }
+ Configuration conf = getConf() != null ? getConf() : HBaseConfiguration.create();
+ try (final Connection conn = ConnectionFactory.createConnection(conf);
+ final BackupAdminImpl admin = new BackupAdminImpl(conn);) {
+ admin.mergeBackups(backupIds);
+ }
+ }
+
+ @Override
+ protected void printUsage() {
+ System.out.println(MERGE_CMD_USAGE);
+ }
+ }
+
// TODO Cancel command
private static class CancelCommand extends Command {
@@ -672,7 +760,6 @@ public final class BackupCommands {
@Override
public void execute() throws IOException {
-
int n = parseHistoryLength();
final TableName tableName = getTableName();
final String setName = getTableSetName();
@@ -883,7 +970,7 @@ public final class BackupCommands {
private TableName[] toTableNames(String[] tables) {
TableName[] arr = new TableName[tables.length];
- for (int i=0; i < tables.length; i++) {
+ for (int i = 0; i < tables.length; i++) {
arr[i] = TableName.valueOf(tables[i]);
}
return arr;
http://git-wip-us.apache.org/repos/asf/hbase/blob/35aa7aae/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
index bf80506..8fe5eaf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
@@ -115,8 +115,8 @@ public class BackupManager implements Closeable {
}
if (LOG.isDebugEnabled()) {
- LOG.debug("Added log cleaner: " + cleanerClass +"\n" +
- "Added master procedure manager: " + masterProcedureClass);
+ LOG.debug("Added log cleaner: " + cleanerClass + "\n" + "Added master procedure manager: "
+ + masterProcedureClass);
}
}
@@ -185,9 +185,8 @@ public class BackupManager implements Closeable {
* @return BackupInfo
* @throws BackupException exception
*/
- public BackupInfo createBackupInfo(String backupId, BackupType type,
- List<TableName> tableList, String targetRootDir, int workers, long bandwidth)
- throws BackupException {
+ public BackupInfo createBackupInfo(String backupId, BackupType type, List<TableName> tableList,
+ String targetRootDir, int workers, long bandwidth) throws BackupException {
if (targetRootDir == null) {
throw new BackupException("Wrong backup request parameter: target backup root directory");
}
@@ -313,7 +312,7 @@ public class BackupManager implements Closeable {
}
} else {
Path logBackupPath =
- HBackupFileSystem.getLogBackupPath(backup.getBackupRootDir(), backup.getBackupId());
+ HBackupFileSystem.getBackupPath(backup.getBackupRootDir(), backup.getBackupId());
LOG.debug("Current backup has an incremental backup ancestor, "
+ "touching its image manifest in " + logBackupPath.toString()
+ " to construct the dependency.");
@@ -371,7 +370,7 @@ public class BackupManager implements Closeable {
* @throws IOException if active session already exists
*/
public void startBackupSession() throws IOException {
- systemTable.startBackupSession();
+ systemTable.startBackupExclusiveOperation();
}
/**
@@ -379,10 +378,9 @@ public class BackupManager implements Closeable {
* @throws IOException if no active session
*/
public void finishBackupSession() throws IOException {
- systemTable.finishBackupSession();
+ systemTable.finishBackupExclusiveOperation();
}
-
/**
* Read the last backup start code (timestamp) of last successful backup. Will return null if
* there is no startcode stored in backup system table or the value is of length 0. These two
@@ -413,7 +411,7 @@ public class BackupManager implements Closeable {
}
public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>>
- readBulkloadRows(List<TableName> tableList) throws IOException {
+ readBulkloadRows(List<TableName> tableList) throws IOException {
return systemTable.readBulkloadRows(tableList);
}
@@ -448,8 +446,7 @@ public class BackupManager implements Closeable {
*/
public void writeRegionServerLogTimestamp(Set<TableName> tables,
HashMap<String, Long> newTimestamps) throws IOException {
- systemTable.writeRegionServerLogTimestamp(tables, newTimestamps,
- backupInfo.getBackupRootDir());
+ systemTable.writeRegionServerLogTimestamp(tables, newTimestamps, backupInfo.getBackupRootDir());
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/35aa7aae/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java
index b8adac9..7e3201e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.BackupInfo;
import org.apache.hadoop.hbase.backup.BackupType;
+import org.apache.hadoop.hbase.backup.HBackupFileSystem;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
@@ -61,9 +62,8 @@ public class BackupManifest {
public static final String MANIFEST_FILE_NAME = ".backup.manifest";
/**
- * Backup image, the dependency graph is made up by series of backup images
- * BackupImage contains all the relevant information to restore the backup and
- * is used during restore operation
+ * Backup image, the dependency graph is made up by series of backup images BackupImage contains
+ * all the relevant information to restore the backup and is used during restore operation
*/
public static class BackupImage implements Comparable<BackupImage> {
@@ -294,6 +294,16 @@ public class BackupManifest {
return this.ancestors;
}
+ public void removeAncestors(List<String> backupIds) {
+ List<BackupImage> toRemove = new ArrayList<BackupImage>();
+ for (BackupImage im : this.ancestors) {
+ if (backupIds.contains(im.getBackupId())) {
+ toRemove.add(im);
+ }
+ }
+ this.ancestors.removeAll(toRemove);
+ }
+
private void addAncestor(BackupImage backupImage) {
this.getAncestors().add(backupImage);
}
@@ -464,18 +474,16 @@ public class BackupManifest {
}
/**
- * Persist the manifest file.
+ * TODO: fix it. Persist the manifest file.
* @throws IOException IOException when storing the manifest file.
*/
public void store(Configuration conf) throws BackupException {
byte[] data = backupImage.toProto().toByteArray();
// write the file, overwrite if already exist
- String logBackupDir =
- BackupUtils.getLogBackupDir(backupImage.getRootDir(), backupImage.getBackupId());
Path manifestFilePath =
- new Path(new Path((tableBackupDir != null ? tableBackupDir : logBackupDir)),
- MANIFEST_FILE_NAME);
+ new Path(HBackupFileSystem.getBackupPath(backupImage.getRootDir(),
+ backupImage.getBackupId()), MANIFEST_FILE_NAME);
try (FSDataOutputStream out =
manifestFilePath.getFileSystem(conf).create(manifestFilePath, true);) {
out.write(data);
http://git-wip-us.apache.org/repos/asf/hbase/blob/35aa7aae/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
index e5a3daa..4dab046 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
@@ -82,6 +82,7 @@ import org.apache.hadoop.hbase.util.Pair;
* value = backupId and full WAL file name</li>
* </ul></p>
*/
+
@InterfaceAudience.Private
public final class BackupSystemTable implements Closeable {
private static final Log LOG = LogFactory.getLog(BackupSystemTable.class);
@@ -118,7 +119,7 @@ public final class BackupSystemTable implements Closeable {
private TableName tableName;
/**
- * Stores backup sessions (contexts)
+ * Stores backup sessions (contexts)
*/
final static byte[] SESSIONS_FAMILY = "session".getBytes();
/**
@@ -127,11 +128,10 @@ public final class BackupSystemTable implements Closeable {
final static byte[] META_FAMILY = "meta".getBytes();
final static byte[] BULK_LOAD_FAMILY = "bulk".getBytes();
/**
- * Connection to HBase cluster, shared among all instances
+ * Connection to HBase cluster, shared among all instances
*/
private final Connection connection;
-
private final static String BACKUP_INFO_PREFIX = "session:";
private final static String START_CODE_ROW = "startcode:";
private final static byte[] ACTIVE_SESSION_ROW = "activesession:".getBytes();
@@ -147,6 +147,7 @@ public final class BackupSystemTable implements Closeable {
private final static String BULK_LOAD_PREFIX = "bulk:";
private final static byte[] BULK_LOAD_PREFIX_BYTES = BULK_LOAD_PREFIX.getBytes();
private final static byte[] DELETE_OP_ROW = "delete_op_row".getBytes();
+ private final static byte[] MERGE_OP_ROW = "merge_op_row".getBytes();
final static byte[] TBL_COL = Bytes.toBytes("tbl");
final static byte[] FAM_COL = Bytes.toBytes("fam");
@@ -160,7 +161,7 @@ public final class BackupSystemTable implements Closeable {
private final static String SET_KEY_PREFIX = "backupset:";
// separator between BULK_LOAD_PREFIX and ordinals
- protected final static String BLK_LD_DELIM = ":";
+ protected final static String BLK_LD_DELIM = ":";
private final static byte[] EMPTY_VALUE = new byte[] {};
// Safe delimiter in a string
@@ -187,19 +188,19 @@ public final class BackupSystemTable implements Closeable {
}
private void verifyNamespaceExists(Admin admin) throws IOException {
- String namespaceName = tableName.getNamespaceAsString();
- NamespaceDescriptor ns = NamespaceDescriptor.create(namespaceName).build();
- NamespaceDescriptor[] list = admin.listNamespaceDescriptors();
- boolean exists = false;
- for( NamespaceDescriptor nsd: list) {
- if (nsd.getName().equals(ns.getName())) {
- exists = true;
- break;
- }
- }
- if (!exists) {
- admin.createNamespace(ns);
+ String namespaceName = tableName.getNamespaceAsString();
+ NamespaceDescriptor ns = NamespaceDescriptor.create(namespaceName).build();
+ NamespaceDescriptor[] list = admin.listNamespaceDescriptors();
+ boolean exists = false;
+ for (NamespaceDescriptor nsd : list) {
+ if (nsd.getName().equals(ns.getName())) {
+ exists = true;
+ break;
}
+ }
+ if (!exists) {
+ admin.createNamespace(ns);
+ }
}
private void waitForSystemTable(Admin admin) throws IOException {
@@ -211,15 +212,13 @@ public final class BackupSystemTable implements Closeable {
} catch (InterruptedException e) {
}
if (EnvironmentEdgeManager.currentTime() - startTime > TIMEOUT) {
- throw new IOException("Failed to create backup system table after "+ TIMEOUT+"ms");
+ throw new IOException("Failed to create backup system table after " + TIMEOUT + "ms");
}
}
LOG.debug("Backup table exists and available");
}
-
-
@Override
public void close() {
// do nothing
@@ -257,7 +256,7 @@ public final class BackupSystemTable implements Closeable {
byte[] row = CellUtil.cloneRow(res.listCells().get(0));
for (Cell cell : res.listCells()) {
if (CellComparator.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0,
- BackupSystemTable.PATH_COL.length) == 0) {
+ BackupSystemTable.PATH_COL.length) == 0) {
map.put(row, Bytes.toString(CellUtil.cloneValue(cell)));
}
}
@@ -286,13 +285,13 @@ public final class BackupSystemTable implements Closeable {
String path = null;
for (Cell cell : res.listCells()) {
if (CellComparator.compareQualifiers(cell, BackupSystemTable.TBL_COL, 0,
- BackupSystemTable.TBL_COL.length) == 0) {
+ BackupSystemTable.TBL_COL.length) == 0) {
tbl = TableName.valueOf(CellUtil.cloneValue(cell));
} else if (CellComparator.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0,
- BackupSystemTable.FAM_COL.length) == 0) {
+ BackupSystemTable.FAM_COL.length) == 0) {
fam = CellUtil.cloneValue(cell);
} else if (CellComparator.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0,
- BackupSystemTable.PATH_COL.length) == 0) {
+ BackupSystemTable.PATH_COL.length) == 0) {
path = Bytes.toString(CellUtil.cloneValue(cell));
}
}
@@ -313,9 +312,10 @@ public final class BackupSystemTable implements Closeable {
}
files.add(new Path(path));
if (LOG.isDebugEnabled()) {
- LOG.debug("found bulk loaded file : " + tbl + " " + Bytes.toString(fam) + " " + path);
+ LOG.debug("found bulk loaded file : " + tbl + " " + Bytes.toString(fam) + " " + path);
}
- };
+ }
+ ;
return mapForSrc;
}
}
@@ -359,16 +359,16 @@ public final class BackupSystemTable implements Closeable {
public void writePathsPostBulkLoad(TableName tabName, byte[] region,
Map<byte[], List<Path>> finalPaths) throws IOException {
if (LOG.isDebugEnabled()) {
- LOG.debug("write bulk load descriptor to backup " + tabName + " with " +
- finalPaths.size() + " entries");
+ LOG.debug("write bulk load descriptor to backup " + tabName + " with " + finalPaths.size()
+ + " entries");
}
try (Table table = connection.getTable(tableName)) {
- List<Put> puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region,
- finalPaths);
+ List<Put> puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region, finalPaths);
table.put(puts);
LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName);
}
}
+
/*
* For preCommitStoreFile() hook
* @param tabName table name
@@ -376,15 +376,15 @@ public final class BackupSystemTable implements Closeable {
* @param family column family
* @param pairs list of paths for hfiles
*/
- public void writeFilesForBulkLoadPreCommit(TableName tabName, byte[] region,
- final byte[] family, final List<Pair<Path, Path>> pairs) throws IOException {
+ public void writeFilesForBulkLoadPreCommit(TableName tabName, byte[] region, final byte[] family,
+ final List<Pair<Path, Path>> pairs) throws IOException {
if (LOG.isDebugEnabled()) {
- LOG.debug("write bulk load descriptor to backup " + tabName + " with " +
- pairs.size() + " entries");
+ LOG.debug("write bulk load descriptor to backup " + tabName + " with " + pairs.size()
+ + " entries");
}
try (Table table = connection.getTable(tableName)) {
- List<Put> puts = BackupSystemTable.createPutForPreparedBulkload(tabName, region,
- family, pairs);
+ List<Put> puts =
+ BackupSystemTable.createPutForPreparedBulkload(tabName, region, family, pairs);
table.put(puts);
LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName);
}
@@ -411,11 +411,11 @@ public final class BackupSystemTable implements Closeable {
/*
* Reads the rows from backup table recording bulk loaded hfiles
* @param tableList list of table names
- * @return The keys of the Map are table, region and column family.
- * Value of the map reflects whether the hfile was recorded by preCommitStoreFile hook (true)
+ * @return The keys of the Map are table, region and column family. Value of the map reflects
+ * whether the hfile was recorded by preCommitStoreFile hook (true)
*/
public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>>
- readBulkloadRows(List<TableName> tableList) throws IOException {
+ readBulkloadRows(List<TableName> tableList) throws IOException {
Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = new HashMap<>();
List<byte[]> rows = new ArrayList<>();
for (TableName tTable : tableList) {
@@ -437,13 +437,13 @@ public final class BackupSystemTable implements Closeable {
String rowStr = Bytes.toString(row);
region = BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr);
if (CellComparator.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0,
- BackupSystemTable.FAM_COL.length) == 0) {
+ BackupSystemTable.FAM_COL.length) == 0) {
fam = Bytes.toString(CellUtil.cloneValue(cell));
} else if (CellComparator.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0,
- BackupSystemTable.PATH_COL.length) == 0) {
+ BackupSystemTable.PATH_COL.length) == 0) {
path = Bytes.toString(CellUtil.cloneValue(cell));
} else if (CellComparator.compareQualifiers(cell, BackupSystemTable.STATE_COL, 0,
- BackupSystemTable.STATE_COL.length) == 0) {
+ BackupSystemTable.STATE_COL.length) == 0) {
byte[] state = CellUtil.cloneValue(cell);
if (Bytes.equals(BackupSystemTable.BL_PREPARE, state)) {
raw = true;
@@ -484,12 +484,13 @@ public final class BackupSystemTable implements Closeable {
Map<byte[], List<Path>> map = maps[idx];
TableName tn = sTableList.get(idx);
if (map == null) continue;
- for (Map.Entry<byte[], List<Path>> entry: map.entrySet()) {
+ for (Map.Entry<byte[], List<Path>> entry : map.entrySet()) {
byte[] fam = entry.getKey();
List<Path> paths = entry.getValue();
for (Path p : paths) {
- Put put = BackupSystemTable.createPutForBulkLoadedFile(tn, fam, p.toString(),
- backupId, ts, cnt++);
+ Put put =
+ BackupSystemTable.createPutForBulkLoadedFile(tn, fam, p.toString(), backupId, ts,
+ cnt++);
puts.add(put);
}
}
@@ -564,18 +565,23 @@ public final class BackupSystemTable implements Closeable {
}
}
- public void startBackupSession() throws IOException {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Start new backup session");
+ /**
+ * Exclusive operations are:
+ * create, delete, merge
+ * @throws IOException
+ */
+ public void startBackupExclusiveOperation() throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Start new backup exclusive operation");
}
try (Table table = connection.getTable(tableName)) {
Put put = createPutForStartBackupSession();
- //First try to put if row does not exist
+ // First try to put if row does not exist
if (!table.checkAndPut(ACTIVE_SESSION_ROW, SESSIONS_FAMILY, ACTIVE_SESSION_COL, null, put)) {
// Row exists, try to put if value == ACTIVE_SESSION_NO
if (!table.checkAndPut(ACTIVE_SESSION_ROW, SESSIONS_FAMILY, ACTIVE_SESSION_COL,
ACTIVE_SESSION_NO, put)) {
- throw new IOException("There is an active backup session");
+ throw new IOException("There is an active backup exclusive operation");
}
}
}
@@ -587,17 +593,15 @@ public final class BackupSystemTable implements Closeable {
return put;
}
- public void finishBackupSession() throws IOException
- {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Stop backup session");
+ public void finishBackupExclusiveOperation() throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Finish backup exclusive operation");
}
try (Table table = connection.getTable(tableName)) {
Put put = createPutForStopBackupSession();
- if(!table.checkAndPut(ACTIVE_SESSION_ROW, SESSIONS_FAMILY, ACTIVE_SESSION_COL,
- ACTIVE_SESSION_YES, put))
- {
- throw new IOException("There is no active backup session");
+ if (!table.checkAndPut(ACTIVE_SESSION_ROW, SESSIONS_FAMILY, ACTIVE_SESSION_COL,
+ ACTIVE_SESSION_YES, put)) {
+ throw new IOException("There is no active backup exclusive operation");
}
}
}
@@ -630,8 +634,7 @@ public final class BackupSystemTable implements Closeable {
res.advance();
Cell cell = res.current();
byte[] row = CellUtil.cloneRow(cell);
- String server =
- getServerNameForReadRegionServerLastLogRollResult(row);
+ String server = getServerNameForReadRegionServerLastLogRollResult(row);
byte[] data = CellUtil.cloneValue(cell);
rsTimestampMap.put(server, Bytes.toLong(data));
}
@@ -652,8 +655,7 @@ public final class BackupSystemTable implements Closeable {
LOG.trace("write region server last roll log result to backup system table");
}
try (Table table = connection.getTable(tableName)) {
- Put put =
- createPutForRegionServerLastLogRollResult(server, ts, backupRoot);
+ Put put = createPutForRegionServerLastLogRollResult(server, ts, backupRoot);
table.put(put);
}
}
@@ -685,14 +687,15 @@ public final class BackupSystemTable implements Closeable {
/**
* Get first n backup history records
- * @param n number of records
+ * @param n number of records, if n== -1 - max number
+ * is ignored
* @return list of records
* @throws IOException
*/
public List<BackupInfo> getHistory(int n) throws IOException {
List<BackupInfo> history = getBackupHistory();
- if (history.size() <= n) return history;
+ if (n == -1 || history.size() <= n) return history;
List<BackupInfo> list = new ArrayList<BackupInfo>();
for (int i = 0; i < n; i++) {
list.add(history.get(i));
@@ -703,7 +706,8 @@ public final class BackupSystemTable implements Closeable {
/**
* Get backup history records filtered by list of filters.
- * @param n max number of records
+ * @param n max number of records, if n == -1 , then max number
+ * is ignored
* @param filters list of filters
* @return backup records
* @throws IOException
@@ -714,7 +718,7 @@ public final class BackupSystemTable implements Closeable {
List<BackupInfo> history = getBackupHistory();
List<BackupInfo> result = new ArrayList<BackupInfo>();
for (BackupInfo bi : history) {
- if (result.size() == n) break;
+ if (n >= 0 && result.size() == n) break;
boolean passed = true;
for (int i = 0; i < filters.length; i++) {
if (!filters[i].apply(bi)) {
@@ -852,9 +856,7 @@ public final class BackupSystemTable implements Closeable {
List<Put> puts = new ArrayList<Put>();
for (TableName table : tables) {
byte[] smapData = toTableServerTimestampProto(table, newTimestamps).toByteArray();
- Put put =
- createPutForWriteRegionServerLogTimestamp(table, smapData,
- backupRoot);
+ Put put = createPutForWriteRegionServerLogTimestamp(table, smapData, backupRoot);
puts.add(put);
}
try (Table table = connection.getTable(tableName)) {
@@ -1018,8 +1020,7 @@ public final class BackupSystemTable implements Closeable {
}
}
try (Table table = connection.getTable(tableName)) {
- List<Put> puts =
- createPutsForAddWALFiles(files, backupId, backupRoot);
+ List<Put> puts = createPutsForAddWALFiles(files, backupId, backupRoot);
table.put(puts);
}
}
@@ -1087,6 +1088,7 @@ public final class BackupSystemTable implements Closeable {
* @param file name of a file to check
* @return true, if deletable, false otherwise.
* @throws IOException exception
+ * TODO: multiple backup destination support
*/
public boolean isWALFileDeletable(String file) throws IOException {
if (LOG.isTraceEnabled()) {
@@ -1271,12 +1273,12 @@ public final class BackupSystemTable implements Closeable {
if (disjoint.length > 0 && disjoint.length != tables.length) {
Put put = createPutForBackupSet(name, disjoint);
table.put(put);
- } else if(disjoint.length == tables.length) {
+ } else if (disjoint.length == tables.length) {
LOG.warn("Backup set '" + name + "' does not contain tables ["
+ StringUtils.join(toRemove, " ") + "]");
} else { // disjoint.length == 0 and tables.length >0
- // Delete backup set
- LOG.info("Backup set '"+name+"' is empty. Deleting.");
+ // Delete backup set
+ LOG.info("Backup set '" + name + "' is empty. Deleting.");
deleteBackupSet(name);
}
} finally {
@@ -1356,7 +1358,7 @@ public final class BackupSystemTable implements Closeable {
}
public static String getSnapshotName(Configuration conf) {
- return "snapshot_"+getTableNameAsString(conf).replace(":", "_");
+ return "snapshot_" + getTableNameAsString(conf).replace(":", "_");
}
/**
@@ -1589,17 +1591,16 @@ public final class BackupSystemTable implements Closeable {
for (Path path : entry.getValue()) {
String file = path.toString();
int lastSlash = file.lastIndexOf("/");
- String filename = file.substring(lastSlash+1);
- Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM,
- Bytes.toString(region), BLK_LD_DELIM, filename));
+ String filename = file.substring(lastSlash + 1);
+ Put put =
+ new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM,
+ Bytes.toString(region), BLK_LD_DELIM, filename));
put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName());
put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, entry.getKey());
- put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL,
- file.getBytes());
+ put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, file.getBytes());
put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_COMMIT);
puts.add(put);
- LOG.debug("writing done bulk path " + file + " for " + table + " " +
- Bytes.toString(region));
+ LOG.debug("writing done bulk path " + file + " for " + table + " " + Bytes.toString(region));
}
}
return puts;
@@ -1607,19 +1608,16 @@ public final class BackupSystemTable implements Closeable {
public static void snapshot(Connection conn) throws IOException {
- try (Admin admin = conn.getAdmin();){
+ try (Admin admin = conn.getAdmin();) {
Configuration conf = conn.getConfiguration();
- admin.snapshot(BackupSystemTable.getSnapshotName(conf),
- BackupSystemTable.getTableName(conf));
+ admin.snapshot(BackupSystemTable.getSnapshotName(conf), BackupSystemTable.getTableName(conf));
}
}
- public static void restoreFromSnapshot(Connection conn)
- throws IOException {
+ public static void restoreFromSnapshot(Connection conn) throws IOException {
Configuration conf = conn.getConfiguration();
- LOG.debug("Restoring " + BackupSystemTable.getTableNameAsString(conf) +
- " from snapshot");
+ LOG.debug("Restoring " + BackupSystemTable.getTableNameAsString(conf) + " from snapshot");
try (Admin admin = conn.getAdmin();) {
String snapshotName = BackupSystemTable.getSnapshotName(conf);
if (snapshotExists(admin, snapshotName)) {
@@ -1631,8 +1629,8 @@ public final class BackupSystemTable implements Closeable {
// Snapshot does not exists, i.e completeBackup failed after
// deleting backup system table snapshot
// In this case we log WARN and proceed
- LOG.warn("Could not restore backup system table. Snapshot " + snapshotName+
- " does not exists.");
+ LOG.warn("Could not restore backup system table. Snapshot " + snapshotName
+ + " does not exists.");
}
}
}
@@ -1640,7 +1638,7 @@ public final class BackupSystemTable implements Closeable {
protected static boolean snapshotExists(Admin admin, String snapshotName) throws IOException {
List<SnapshotDescription> list = admin.listSnapshots();
- for (SnapshotDescription desc: list) {
+ for (SnapshotDescription desc : list) {
if (desc.getName().equals(snapshotName)) {
return true;
}
@@ -1648,26 +1646,25 @@ public final class BackupSystemTable implements Closeable {
return false;
}
- public static boolean snapshotExists (Connection conn) throws IOException {
+ public static boolean snapshotExists(Connection conn) throws IOException {
return snapshotExists(conn.getAdmin(), getSnapshotName(conn.getConfiguration()));
}
- public static void deleteSnapshot(Connection conn)
- throws IOException {
+ public static void deleteSnapshot(Connection conn) throws IOException {
Configuration conf = conn.getConfiguration();
- LOG.debug("Deleting " + BackupSystemTable.getSnapshotName(conf) +
- " from the system");
+ LOG.debug("Deleting " + BackupSystemTable.getSnapshotName(conf) + " from the system");
try (Admin admin = conn.getAdmin();) {
String snapshotName = BackupSystemTable.getSnapshotName(conf);
if (snapshotExists(admin, snapshotName)) {
admin.deleteSnapshot(snapshotName);
LOG.debug("Done deleting backup system table snapshot");
} else {
- LOG.error("Snapshot "+snapshotName+" does not exists");
+ LOG.error("Snapshot " + snapshotName + " does not exists");
}
}
}
+
/*
* Creates Put's for bulk load resulting from running LoadIncrementalHFiles
*/
@@ -1678,17 +1675,16 @@ public final class BackupSystemTable implements Closeable {
Path path = pair.getSecond();
String file = path.toString();
int lastSlash = file.lastIndexOf("/");
- String filename = file.substring(lastSlash+1);
- Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM,
- Bytes.toString(region), BLK_LD_DELIM, filename));
+ String filename = file.substring(lastSlash + 1);
+ Put put =
+ new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM, Bytes.toString(region),
+ BLK_LD_DELIM, filename));
put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName());
put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, family);
- put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL,
- file.getBytes());
+ put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, file.getBytes());
put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_PREPARE);
puts.add(put);
- LOG.debug("writing raw bulk path " + file + " for " + table + " " +
- Bytes.toString(region));
+ LOG.debug("writing raw bulk path " + file + " for " + table + " " + Bytes.toString(region));
}
return puts;
}
@@ -1725,7 +1721,6 @@ public final class BackupSystemTable implements Closeable {
return get;
}
-
public void startDeleteOperation(String[] backupIdList) throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("Start delete operation for backups: " + StringUtils.join(backupIdList));
@@ -1765,6 +1760,96 @@ public final class BackupSystemTable implements Closeable {
}
}
+ private Put createPutForMergeOperation(String[] backupIdList) {
+
+ byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ","));
+ Put put = new Put(MERGE_OP_ROW);
+ put.addColumn(META_FAMILY, FAM_COL, value);
+ return put;
+ }
+
+ public boolean isMergeInProgress() throws IOException {
+ Get get = new Get(MERGE_OP_ROW);
+ try (Table table = connection.getTable(tableName)) {
+ Result res = table.get(get);
+ if (res.isEmpty()) {
+ return false;
+ }
+ return true;
+ }
+ }
+
+ private Put createPutForUpdateTablesForMerge(List<TableName> tables) {
+
+ byte[] value = Bytes.toBytes(StringUtils.join(tables, ","));
+ Put put = new Put(MERGE_OP_ROW);
+ put.addColumn(META_FAMILY, PATH_COL, value);
+ return put;
+ }
+
+ private Delete createDeleteForBackupMergeOperation() {
+
+ Delete delete = new Delete(MERGE_OP_ROW);
+ delete.addFamily(META_FAMILY);
+ return delete;
+ }
+
+ private Get createGetForMergeOperation() {
+
+ Get get = new Get(MERGE_OP_ROW);
+ get.addFamily(META_FAMILY);
+ return get;
+ }
+
+ public void startMergeOperation(String[] backupIdList) throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Start merge operation for backups: " + StringUtils.join(backupIdList));
+ }
+ Put put = createPutForMergeOperation(backupIdList);
+ try (Table table = connection.getTable(tableName)) {
+ table.put(put);
+ }
+ }
+
+ public void updateProcessedTablesForMerge(List<TableName> tables) throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Update tables for merge : " + StringUtils.join(tables, ","));
+ }
+ Put put = createPutForUpdateTablesForMerge(tables);
+ try (Table table = connection.getTable(tableName)) {
+ table.put(put);
+ }
+ }
+
+ public void finishMergeOperation() throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Finsih merge operation for backup ids ");
+ }
+ Delete delete = createDeleteForBackupMergeOperation();
+ try (Table table = connection.getTable(tableName)) {
+ table.delete(delete);
+ }
+ }
+
+ public String[] getListOfBackupIdsFromMergeOperation() throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Get backup ids for merge operation");
+ }
+ Get get = createGetForMergeOperation();
+ try (Table table = connection.getTable(tableName)) {
+ Result res = table.get(get);
+ if (res.isEmpty()) {
+ return null;
+ }
+ Cell cell = res.listCells().get(0);
+ byte[] val = CellUtil.cloneValue(cell);
+ if (val.length == 0) {
+ return null;
+ }
+ return new String(val).split(",");
+ }
+ }
+
static Scan createScanForOrigBulkLoadedFiles(TableName table) throws IOException {
Scan scan = new Scan();
byte[] startRow = rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM);
@@ -1776,10 +1861,12 @@ public final class BackupSystemTable implements Closeable {
scan.setMaxVersions(1);
return scan;
}
+
static String getTableNameFromOrigBulkLoadRow(String rowStr) {
String[] parts = rowStr.split(BLK_LD_DELIM);
return parts[1];
}
+
static String getRegionNameFromOrigBulkLoadRow(String rowStr) {
// format is bulk : namespace : table : region : file
String[] parts = rowStr.split(BLK_LD_DELIM);
@@ -1791,6 +1878,7 @@ public final class BackupSystemTable implements Closeable {
LOG.debug("bulk row string " + rowStr + " region " + parts[idx]);
return parts[idx];
}
+
/*
* Used to query bulk loaded hfiles which have been copied by incremental backup
* @param backupId the backup Id. It can be null when querying for all tables
@@ -1798,13 +1886,14 @@ public final class BackupSystemTable implements Closeable {
*/
static Scan createScanForBulkLoadedFiles(String backupId) throws IOException {
Scan scan = new Scan();
- byte[] startRow = backupId == null ? BULK_LOAD_PREFIX_BYTES :
- rowkey(BULK_LOAD_PREFIX, backupId+BLK_LD_DELIM);
+ byte[] startRow =
+ backupId == null ? BULK_LOAD_PREFIX_BYTES : rowkey(BULK_LOAD_PREFIX, backupId
+ + BLK_LD_DELIM);
byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
scan.setStartRow(startRow);
scan.setStopRow(stopRow);
- //scan.setTimeRange(lower, Long.MAX_VALUE);
+ // scan.setTimeRange(lower, Long.MAX_VALUE);
scan.addFamily(BackupSystemTable.META_FAMILY);
scan.setMaxVersions(1);
return scan;
@@ -1812,12 +1901,13 @@ public final class BackupSystemTable implements Closeable {
static Put createPutForBulkLoadedFile(TableName tn, byte[] fam, String p, String backupId,
long ts, int idx) {
- Put put = new Put(rowkey(BULK_LOAD_PREFIX, backupId+BLK_LD_DELIM+ts+BLK_LD_DELIM+idx));
+ Put put = new Put(rowkey(BULK_LOAD_PREFIX, backupId + BLK_LD_DELIM + ts + BLK_LD_DELIM + idx));
put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, tn.getName());
put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, fam);
put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, p.getBytes());
return put;
}
+
/**
* Creates put list for list of WAL files
* @param files list of WAL file paths
@@ -1825,8 +1915,9 @@ public final class BackupSystemTable implements Closeable {
* @return put list
* @throws IOException exception
*/
- private List<Put> createPutsForAddWALFiles(List<String> files, String backupId,
- String backupRoot) throws IOException {
+ private List<Put>
+ createPutsForAddWALFiles(List<String> files, String backupId, String backupRoot)
+ throws IOException {
List<Put> puts = new ArrayList<Put>();
for (String file : files) {
@@ -1957,5 +2048,4 @@ public final class BackupSystemTable implements Closeable {
return sb.toString().getBytes();
}
-
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/35aa7aae/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
index 381e9b1..ea7a7b8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
@@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.backup.BackupType;
import org.apache.hadoop.hbase.backup.HBackupFileSystem;
import org.apache.hadoop.hbase.backup.RestoreRequest;
import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
-import org.apache.hadoop.hbase.backup.mapreduce.MapReduceRestoreJob;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.backup.util.RestoreTool;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
@@ -58,7 +58,6 @@ public class RestoreTablesClient {
private Configuration conf;
private Connection conn;
private String backupId;
- private String fullBackupId;
private TableName[] sTableArray;
private TableName[] tTableArray;
private String targetRootDir;
@@ -107,8 +106,7 @@ public class RestoreTablesClient {
if (existTableList.size() > 0) {
if (!isOverwrite) {
- LOG.error("Existing table ("
- + existTableList
+ LOG.error("Existing table (" + existTableList
+ ") found in the restore target, please add "
+ "\"-overwrite\" option in the command if you mean"
+ " to restore to these existing tables");
@@ -148,9 +146,8 @@ public class RestoreTablesClient {
Path tableBackupPath = HBackupFileSystem.getTableBackupPath(sTable, backupRoot, backupId);
String lastIncrBackupId = images.length == 1 ? null : images[images.length - 1].getBackupId();
// We need hFS only for full restore (see the code)
- BackupManifest manifest = HBackupFileSystem.getManifest(sTable, conf, backupRoot, backupId);
+ BackupManifest manifest = HBackupFileSystem.getManifest(conf, backupRoot, backupId);
if (manifest.getType() == BackupType.FULL) {
- fullBackupId = manifest.getBackupImage().getBackupId();
LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from full" + " backup image "
+ tableBackupPath.toString());
restoreTool.fullRestoreTable(conn, tableBackupPath, sTable, tTable, truncateIfExists,
@@ -169,8 +166,8 @@ public class RestoreTablesClient {
// full backup path comes first
for (int i = 1; i < images.length; i++) {
BackupImage im = images[i];
- String fileBackupDir = HBackupFileSystem.getTableBackupDir(im.getRootDir(),
- im.getBackupId(), sTable)+ Path.SEPARATOR+"data";
+ String fileBackupDir =
+ HBackupFileSystem.getTableBackupDataDir(im.getRootDir(), im.getBackupId(), sTable);
dirList.add(new Path(fileBackupDir));
}
@@ -196,8 +193,10 @@ public class RestoreTablesClient {
TreeSet<BackupImage> restoreImageSet = new TreeSet<BackupImage>();
boolean truncateIfExists = isOverwrite;
Set<String> backupIdSet = new HashSet<>();
+
for (int i = 0; i < sTableArray.length; i++) {
TableName table = sTableArray[i];
+
BackupManifest manifest = backupManifestMap.get(table);
// Get the image list of this backup for restore in time order from old
// to new.
@@ -213,11 +212,8 @@ public class RestoreTablesClient {
if (restoreImageSet != null && !restoreImageSet.isEmpty()) {
LOG.info("Restore includes the following image(s):");
for (BackupImage image : restoreImageSet) {
- LOG.info("Backup: "
- + image.getBackupId()
- + " "
- + HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(),
- table));
+ LOG.info("Backup: " + image.getBackupId() + " "
+ + HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(), table));
if (image.getType() == BackupType.INCREMENTAL) {
backupIdSet.add(image.getBackupId());
LOG.debug("adding " + image.getBackupId() + " for bulk load");
@@ -232,13 +228,13 @@ public class RestoreTablesClient {
Map<byte[], List<Path>>[] mapForSrc = table.readBulkLoadedFiles(id, sTableList);
Map<LoadQueueItem, ByteBuffer> loaderResult;
conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true);
- LoadIncrementalHFiles loader = MapReduceRestoreJob.createLoader(conf);
+ LoadIncrementalHFiles loader = BackupUtils.createLoader(conf);
for (int i = 0; i < sTableList.size(); i++) {
if (mapForSrc[i] != null && !mapForSrc[i].isEmpty()) {
loaderResult = loader.run(null, mapForSrc[i], tTableArray[i]);
LOG.debug("bulk loading " + sTableList.get(i) + " to " + tTableArray[i]);
if (loaderResult.isEmpty()) {
- String msg = "Couldn't bulk load for " + sTableList.get(i) + " to " +tTableArray[i];
+ String msg = "Couldn't bulk load for " + sTableList.get(i) + " to " + tTableArray[i];
LOG.error(msg);
throw new IOException(msg);
}
@@ -253,7 +249,7 @@ public class RestoreTablesClient {
if (backupId == null) {
return 0;
}
- return Long.parseLong(backupId.substring(backupId.lastIndexOf("_")+1));
+ return Long.parseLong(backupId.substring(backupId.lastIndexOf("_") + 1));
}
static boolean withinRange(long a, long lower, long upper) {
@@ -268,15 +264,15 @@ public class RestoreTablesClient {
// case VALIDATION:
// check the target tables
checkTargetTables(tTableArray, isOverwrite);
+
// case RESTORE_IMAGES:
HashMap<TableName, BackupManifest> backupManifestMap = new HashMap<>();
// check and load backup image manifest for the tables
Path rootPath = new Path(targetRootDir);
HBackupFileSystem.checkImageManifestExist(backupManifestMap, sTableArray, conf, rootPath,
backupId);
+
restore(backupManifestMap, sTableArray, tTableArray, isOverwrite);
}
-
-
}