You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/09/13 01:48:36 UTC
[hbase] 01/02: Revert "HBASE-25891 Remove dependence on storing WAL
filenames for backup (#3359)"
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit fc5a8fd3bf6d520cef87ec63dd50cba94f8a207d
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Mon Sep 13 09:47:25 2021 +0800
Revert "HBASE-25891 Remove dependence on storing WAL filenames for backup (#3359)"
This reverts commit df57b1ca6b990f69c5edd08862dbc9956965b45e.
---
.../org/apache/hadoop/hbase/backup/BackupInfo.java | 51 ++---
.../hadoop/hbase/backup/impl/BackupManager.java | 23 ++-
.../hadoop/hbase/backup/impl/BackupManifest.java | 20 +-
.../hbase/backup/impl/BackupSystemTable.java | 207 ++++++++++++++++++++-
.../hbase/backup/impl/FullTableBackupClient.java | 12 +-
.../backup/impl/IncrementalBackupManager.java | 132 ++++++++++++-
.../backup/impl/IncrementalTableBackupClient.java | 8 +-
.../hbase/backup/impl/TableBackupClient.java | 5 +-
.../hbase/backup/master/BackupLogCleaner.java | 98 ++++------
.../hadoop/hbase/backup/util/BackupUtils.java | 19 +-
.../apache/hadoop/hbase/backup/TestBackupBase.java | 47 +++--
.../hadoop/hbase/backup/TestBackupSystemTable.java | 49 ++++-
.../hbase/backup/master/TestBackupLogCleaner.java | 49 ++++-
.../src/main/protobuf/Backup.proto | 4 -
14 files changed, 537 insertions(+), 187 deletions(-)
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java
index d8a6940..6c304ca 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
@@ -35,6 +36,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos.BackupInfo.Builder;
@@ -134,13 +136,7 @@ public class BackupInfo implements Comparable<BackupInfo> {
* New region server log timestamps for table set after distributed log roll key - table name,
* value - map of RegionServer hostname -> last log rolled timestamp
*/
- private Map<TableName, Map<String, Long>> tableSetTimestampMap;
-
- /**
- * Previous Region server log timestamps for table set after distributed log roll key -
- * table name, value - map of RegionServer hostname -> last log rolled timestamp
- */
- private Map<TableName, Map<String, Long>> incrTimestampMap;
+ private HashMap<TableName, HashMap<String, Long>> tableSetTimestampMap;
/**
* Backup progress in %% (0-100)
@@ -194,12 +190,12 @@ public class BackupInfo implements Comparable<BackupInfo> {
this.backupTableInfoMap = backupTableInfoMap;
}
- public Map<TableName, Map<String, Long>> getTableSetTimestampMap() {
+ public HashMap<TableName, HashMap<String, Long>> getTableSetTimestampMap() {
return tableSetTimestampMap;
}
- public void setTableSetTimestampMap(Map<TableName,
- Map<String, Long>> tableSetTimestampMap) {
+ public void setTableSetTimestampMap(HashMap<TableName,
+ HashMap<String, Long>> tableSetTimestampMap) {
this.tableSetTimestampMap = tableSetTimestampMap;
}
@@ -355,19 +351,19 @@ public class BackupInfo implements Comparable<BackupInfo> {
/**
* Set the new region server log timestamps after distributed log roll
- * @param prevTableSetTimestampMap table timestamp map
+ * @param newTableSetTimestampMap table timestamp map
*/
- public void setIncrTimestampMap(Map<TableName,
- Map<String, Long>> prevTableSetTimestampMap) {
- this.incrTimestampMap = prevTableSetTimestampMap;
+ public void setIncrTimestampMap(HashMap<TableName,
+ HashMap<String, Long>> newTableSetTimestampMap) {
+ this.tableSetTimestampMap = newTableSetTimestampMap;
}
/**
* Get new region server log timestamps after distributed log roll
* @return new region server log timestamps
*/
- public Map<TableName, Map<String, Long>> getIncrTimestampMap() {
- return this.incrTimestampMap;
+ public HashMap<TableName, HashMap<String, Long>> getIncrTimestampMap() {
+ return this.tableSetTimestampMap;
}
public TableName getTableBySnapshot(String snapshotName) {
@@ -383,7 +379,6 @@ public class BackupInfo implements Comparable<BackupInfo> {
BackupProtos.BackupInfo.Builder builder = BackupProtos.BackupInfo.newBuilder();
builder.setBackupId(getBackupId());
setBackupTableInfoMap(builder);
- setTableSetTimestampMap(builder);
builder.setCompleteTs(getCompleteTs());
if (getFailedMsg() != null) {
builder.setFailedMessage(getFailedMsg());
@@ -451,16 +446,6 @@ public class BackupInfo implements Comparable<BackupInfo> {
}
}
- private void setTableSetTimestampMap(Builder builder) {
- if (this.getTableSetTimestampMap() != null) {
- for (Entry<TableName, Map<String, Long>> entry : this.getTableSetTimestampMap().entrySet()) {
- builder.putTableSetTimestamp(entry.getKey().getNameAsString(),
- BackupProtos.BackupInfo.RSTimestampMap.newBuilder().putAllRsTimestamp(entry.getValue())
- .build());
- }
- }
- }
-
public static BackupInfo fromByteArray(byte[] data) throws IOException {
return fromProto(BackupProtos.BackupInfo.parseFrom(data));
}
@@ -473,7 +458,6 @@ public class BackupInfo implements Comparable<BackupInfo> {
BackupInfo context = new BackupInfo();
context.setBackupId(proto.getBackupId());
context.setBackupTableInfoMap(toMap(proto.getBackupTableInfoList()));
- context.setTableSetTimestampMap(getTableSetTimestampMap(proto.getTableSetTimestampMap()));
context.setCompleteTs(proto.getCompleteTs());
if (proto.hasFailedMessage()) {
context.setFailedMsg(proto.getFailedMessage());
@@ -507,17 +491,6 @@ public class BackupInfo implements Comparable<BackupInfo> {
return map;
}
- private static Map<TableName, Map<String, Long>> getTableSetTimestampMap(
- Map<String, BackupProtos.BackupInfo.RSTimestampMap> map) {
- Map<TableName, Map<String, Long>> tableSetTimestampMap = new HashMap<>();
- for (Entry<String, BackupProtos.BackupInfo.RSTimestampMap> entry : map.entrySet()) {
- tableSetTimestampMap
- .put(TableName.valueOf(entry.getKey()), entry.getValue().getRsTimestampMap());
- }
-
- return tableSetTimestampMap;
- }
-
public String getShortDescription() {
StringBuilder sb = new StringBuilder();
sb.append("{");
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
index 08494f0..79c702c 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
@@ -22,6 +22,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -481,7 +482,7 @@ public class BackupManager implements Closeable {
* @throws IOException exception
*/
public void writeRegionServerLogTimestamp(Set<TableName> tables,
- Map<String, Long> newTimestamps) throws IOException {
+ HashMap<String, Long> newTimestamps) throws IOException {
systemTable.writeRegionServerLogTimestamp(tables, newTimestamps, backupInfo.getBackupRootDir());
}
@@ -492,7 +493,7 @@ public class BackupManager implements Closeable {
* RegionServer,PreviousTimeStamp
* @throws IOException exception
*/
- public Map<TableName, Map<String, Long>> readLogTimestampMap() throws IOException {
+ public HashMap<TableName, HashMap<String, Long>> readLogTimestampMap() throws IOException {
return systemTable.readLogTimestampMap(backupInfo.getBackupRootDir());
}
@@ -514,6 +515,24 @@ public class BackupManager implements Closeable {
systemTable.addIncrementalBackupTableSet(tables, backupInfo.getBackupRootDir());
}
+ /**
+ * Saves list of WAL files after incremental backup operation. These files will be stored until
+ * TTL expiration and are used by Backup Log Cleaner plug-in to determine which WAL files can be
+ * safely purged.
+ */
+ public void recordWALFiles(List<String> files) throws IOException {
+ systemTable.addWALFiles(files, backupInfo.getBackupId(), backupInfo.getBackupRootDir());
+ }
+
+ /**
+ * Get WAL files iterator.
+ * @return WAL files iterator from backup system table
+ * @throws IOException if getting the WAL files iterator fails
+ */
+ public Iterator<BackupSystemTable.WALItem> getWALFilesFromBackupSystem() throws IOException {
+ return systemTable.getWALFilesIterator(backupInfo.getBackupRootDir());
+ }
+
public Connection getConnection() {
return conn;
}
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java
index 4d4965d..049d38b 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java
@@ -116,7 +116,7 @@ public class BackupManifest {
private long startTs;
private long completeTs;
private ArrayList<BackupImage> ancestors;
- private Map<TableName, Map<String, Long>> incrTimeRanges;
+ private HashMap<TableName, HashMap<String, Long>> incrTimeRanges;
static Builder newBuilder() {
return new Builder();
@@ -187,11 +187,11 @@ public class BackupManifest {
return builder.build();
}
- private static Map<TableName, Map<String, Long>> loadIncrementalTimestampMap(
+ private static HashMap<TableName, HashMap<String, Long>> loadIncrementalTimestampMap(
BackupProtos.BackupImage proto) {
List<BackupProtos.TableServerTimestamp> list = proto.getTstMapList();
- Map<TableName, Map<String, Long>> incrTimeRanges = new HashMap<>();
+ HashMap<TableName, HashMap<String, Long>> incrTimeRanges = new HashMap<>();
if (list == null || list.size() == 0) {
return incrTimeRanges;
@@ -199,7 +199,7 @@ public class BackupManifest {
for (BackupProtos.TableServerTimestamp tst : list) {
TableName tn = ProtobufUtil.toTableName(tst.getTableName());
- Map<String, Long> map = incrTimeRanges.get(tn);
+ HashMap<String, Long> map = incrTimeRanges.get(tn);
if (map == null) {
map = new HashMap<>();
incrTimeRanges.put(tn, map);
@@ -217,9 +217,9 @@ public class BackupManifest {
if (this.incrTimeRanges == null) {
return;
}
- for (Entry<TableName, Map<String, Long>> entry : this.incrTimeRanges.entrySet()) {
+ for (Entry<TableName, HashMap<String, Long>> entry : this.incrTimeRanges.entrySet()) {
TableName key = entry.getKey();
- Map<String, Long> value = entry.getValue();
+ HashMap<String, Long> value = entry.getValue();
BackupProtos.TableServerTimestamp.Builder tstBuilder =
BackupProtos.TableServerTimestamp.newBuilder();
tstBuilder.setTableName(ProtobufUtil.toProtoTableName(key));
@@ -359,11 +359,11 @@ public class BackupManifest {
return hash;
}
- public Map<TableName, Map<String, Long>> getIncrTimeRanges() {
+ public HashMap<TableName, HashMap<String, Long>> getIncrTimeRanges() {
return incrTimeRanges;
}
- private void setIncrTimeRanges(Map<TableName, Map<String, Long>> incrTimeRanges) {
+ private void setIncrTimeRanges(HashMap<TableName, HashMap<String, Long>> incrTimeRanges) {
this.incrTimeRanges = incrTimeRanges;
}
}
@@ -512,11 +512,11 @@ public class BackupManifest {
* Set the incremental timestamp map directly.
* @param incrTimestampMap timestamp map
*/
- public void setIncrTimestampMap(Map<TableName, Map<String, Long>> incrTimestampMap) {
+ public void setIncrTimestampMap(HashMap<TableName, HashMap<String, Long>> incrTimestampMap) {
this.backupImage.setIncrTimeRanges(incrTimestampMap);
}
- public Map<TableName, Map<String, Long>> getIncrTimestampMap() {
+ public Map<TableName, HashMap<String, Long>> getIncrTimestampMap() {
return backupImage.getIncrTimeRanges();
}
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
index 88093ba..3183ff4 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
@@ -34,9 +34,11 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.stream.Collectors;
+
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
@@ -63,12 +65,14 @@ import org.apache.hadoop.hbase.client.SnapshotDescription;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
@@ -174,10 +178,11 @@ public final class BackupSystemTable implements Closeable {
final static byte[] BL_PREPARE = Bytes.toBytes("R");
final static byte[] BL_COMMIT = Bytes.toBytes("D");
+ private final static String WALS_PREFIX = "wals:";
private final static String SET_KEY_PREFIX = "backupset:";
// separator between BULK_LOAD_PREFIX and ordinals
- private final static String BLK_LD_DELIM = ":";
+ protected final static String BLK_LD_DELIM = ":";
private final static byte[] EMPTY_VALUE = new byte[] {};
// Safe delimiter in a string
@@ -853,7 +858,7 @@ public final class BackupSystemTable implements Closeable {
* @throws IOException exception
*/
public void writeRegionServerLogTimestamp(Set<TableName> tables,
- Map<String, Long> newTimestamps, String backupRoot) throws IOException {
+ HashMap<String, Long> newTimestamps, String backupRoot) throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("write RS log time stamps to backup system table for tables ["
+ StringUtils.join(tables, ",") + "]");
@@ -878,13 +883,13 @@ public final class BackupSystemTable implements Closeable {
* RegionServer,PreviousTimeStamp
* @throws IOException exception
*/
- public Map<TableName, Map<String, Long>> readLogTimestampMap(String backupRoot)
+ public HashMap<TableName, HashMap<String, Long>> readLogTimestampMap(String backupRoot)
throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("read RS log ts from backup system table for root=" + backupRoot);
}
- Map<TableName, Map<String, Long>> tableTimestampMap = new HashMap<>();
+ HashMap<TableName, HashMap<String, Long>> tableTimestampMap = new HashMap<>();
Scan scan = createScanForReadLogTimestampMap(backupRoot);
try (Table table = connection.getTable(tableName);
@@ -1007,6 +1012,148 @@ public final class BackupSystemTable implements Closeable {
}
/**
+ * Register WAL files as eligible for deletion
+ * @param files files
+ * @param backupId backup id
+ * @param backupRoot root directory path to backup destination
+ * @throws IOException exception
+ */
+ public void addWALFiles(List<String> files, String backupId, String backupRoot)
+ throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("add WAL files to backup system table: " + backupId + " " + backupRoot + " files ["
+ + StringUtils.join(files, ",") + "]");
+ }
+ if (LOG.isDebugEnabled()) {
+ files.forEach(file -> LOG.debug("add :" + file));
+ }
+ try (Table table = connection.getTable(tableName)) {
+ List<Put> puts = createPutsForAddWALFiles(files, backupId, backupRoot);
+ table.put(puts);
+ }
+ }
+
+ /**
+ * Register WAL files as eligible for deletion
+ * @param backupRoot root directory path to backup
+ * @throws IOException exception
+ */
+ public Iterator<WALItem> getWALFilesIterator(String backupRoot) throws IOException {
+ LOG.trace("get WAL files from backup system table");
+
+ final Table table = connection.getTable(tableName);
+ Scan scan = createScanForGetWALs(backupRoot);
+ final ResultScanner scanner = table.getScanner(scan);
+ final Iterator<Result> it = scanner.iterator();
+ return new Iterator<WALItem>() {
+
+ @Override
+ public boolean hasNext() {
+ boolean next = it.hasNext();
+ if (!next) {
+ // close all
+ try {
+ scanner.close();
+ table.close();
+ } catch (IOException e) {
+ LOG.error("Close WAL Iterator", e);
+ }
+ }
+ return next;
+ }
+
+ @Override
+ public WALItem next() {
+ Result next = it.next();
+ List<Cell> cells = next.listCells();
+ byte[] buf = cells.get(0).getValueArray();
+ int len = cells.get(0).getValueLength();
+ int offset = cells.get(0).getValueOffset();
+ String backupId = new String(buf, offset, len);
+ buf = cells.get(1).getValueArray();
+ len = cells.get(1).getValueLength();
+ offset = cells.get(1).getValueOffset();
+ String walFile = new String(buf, offset, len);
+ buf = cells.get(2).getValueArray();
+ len = cells.get(2).getValueLength();
+ offset = cells.get(2).getValueOffset();
+ String backupRoot = new String(buf, offset, len);
+ return new WALItem(backupId, walFile, backupRoot);
+ }
+
+ @Override
+ public void remove() {
+ // not implemented
+ throw new RuntimeException("remove is not supported");
+ }
+ };
+ }
+
+ /**
+ * Check if WAL file is eligible for deletion Future: to support all backup destinations
+ * @param file name of a file to check
+ * @return true, if deletable, false otherwise.
+ * @throws IOException exception
+ */
+ // TODO: multiple backup destination support
+ public boolean isWALFileDeletable(String file) throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Check if WAL file has been already backed up in backup system table " + file);
+ }
+ try (Table table = connection.getTable(tableName)) {
+ Get get = createGetForCheckWALFile(file);
+ Result res = table.get(get);
+ return (!res.isEmpty());
+ }
+ }
+
+ /**
+ * Check if WAL file is eligible for deletion using multi-get
+ * @param files names of a file to check
+ * @return map of results (key: FileStatus object. value: true if the file is deletable, false
+ * otherwise)
+ * @throws IOException exception
+ */
+ public Map<FileStatus, Boolean> areWALFilesDeletable(Iterable<FileStatus> files)
+ throws IOException {
+ final int BUF_SIZE = 100;
+
+ Map<FileStatus, Boolean> ret = new HashMap<>();
+ try (Table table = connection.getTable(tableName)) {
+ List<Get> getBuffer = new ArrayList<>();
+ List<FileStatus> fileStatuses = new ArrayList<>();
+
+ for (FileStatus file : files) {
+ String fn = file.getPath().getName();
+ if (fn.startsWith(WALProcedureStore.LOG_PREFIX)) {
+ ret.put(file, true);
+ continue;
+ }
+ String wal = file.getPath().toString();
+ Get get = createGetForCheckWALFile(wal);
+ getBuffer.add(get);
+ fileStatuses.add(file);
+ if (getBuffer.size() >= BUF_SIZE) {
+ Result[] results = table.get(getBuffer);
+ for (int i = 0; i < results.length; i++) {
+ ret.put(fileStatuses.get(i), !results[i].isEmpty());
+ }
+ getBuffer.clear();
+ fileStatuses.clear();
+ }
+ }
+
+ if (!getBuffer.isEmpty()) {
+ Result[] results = table.get(getBuffer);
+ for (int i = 0; i < results.length; i++) {
+ ret.put(fileStatuses.get(i), !results[i].isEmpty());
+ }
+ }
+ }
+ return ret;
+ }
+
+ /**
* Checks if we have at least one backup session in backup system table This API is used by
* BackupLogCleaner
* @return true, if - at least one session exists in backup system table table
@@ -1506,7 +1653,7 @@ public final class BackupSystemTable implements Closeable {
}
}
- private static boolean snapshotExists(Admin admin, String snapshotName) throws IOException {
+ protected static boolean snapshotExists(Admin admin, String snapshotName) throws IOException {
List<SnapshotDescription> list = admin.listSnapshots();
for (SnapshotDescription desc : list) {
if (desc.getName().equals(snapshotName)) {
@@ -1761,6 +1908,56 @@ public final class BackupSystemTable implements Closeable {
}
/**
+ * Creates put list for list of WAL files
+ * @param files list of WAL file paths
+ * @param backupId backup id
+ * @return put list
+ */
+ private List<Put> createPutsForAddWALFiles(List<String> files, String backupId,
+ String backupRoot) {
+ List<Put> puts = new ArrayList<>(files.size());
+ for (String file : files) {
+ Put put = new Put(rowkey(WALS_PREFIX, BackupUtils.getUniqueWALFileNamePart(file)));
+ put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("backupId"),
+ Bytes.toBytes(backupId));
+ put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("file"), Bytes.toBytes(file));
+ put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("root"),
+ Bytes.toBytes(backupRoot));
+ puts.add(put);
+ }
+ return puts;
+ }
+
+ /**
+ * Creates Scan operation to load WALs
+ * @param backupRoot path to backup destination
+ * @return scan operation
+ */
+ private Scan createScanForGetWALs(String backupRoot) {
+ // TODO: support for backupRoot
+ Scan scan = new Scan();
+ byte[] startRow = Bytes.toBytes(WALS_PREFIX);
+ byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
+ stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
+ scan.withStartRow(startRow);
+ scan.withStopRow(stopRow);
+ scan.addFamily(BackupSystemTable.META_FAMILY);
+ return scan;
+ }
+
+ /**
+ * Creates Get operation for a given wal file name TODO: support for backup destination
+ * @param file file
+ * @return get operation
+ */
+ private Get createGetForCheckWALFile(String file) {
+ Get get = new Get(rowkey(WALS_PREFIX, BackupUtils.getUniqueWALFileNamePart(file)));
+ // add backup root column
+ get.addFamily(BackupSystemTable.META_FAMILY);
+ return get;
+ }
+
+ /**
* Creates Scan operation to load backup set list
* @return scan operation
*/
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
index 6ad409e..5bf1373 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
@@ -27,6 +27,7 @@ import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CON
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.TableName;
@@ -159,6 +160,14 @@ public class FullTableBackupClient extends TableBackupClient {
LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props);
newTimestamps = backupManager.readRegionServerLastLogRollResult();
+ if (firstBackup) {
+ // Updates registered log files
+ // We record ALL old WAL files as registered, because
+ // this is a first full backup in the system and these
+ // files are not needed for next incremental backup
+ List<String> logFiles = BackupUtils.getWALFilesOlderThan(conf, newTimestamps);
+ backupManager.recordWALFiles(logFiles);
+ }
// SNAPSHOT_TABLES:
backupInfo.setPhase(BackupPhase.SNAPSHOT);
@@ -186,10 +195,9 @@ public class FullTableBackupClient extends TableBackupClient {
// For incremental backup, it contains the incremental backup table set.
backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps);
- Map<TableName, Map<String, Long>> newTableSetTimestampMap =
+ HashMap<TableName, HashMap<String, Long>> newTableSetTimestampMap =
backupManager.readLogTimestampMap();
- backupInfo.setTableSetTimestampMap(newTableSetTimestampMap);
Long newStartCode =
BackupUtils.getMinValue(BackupUtils
.getRSLogTimestampMins(newTableSetTimestampMap));
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java
index 847837f..93d264a 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java
@@ -21,8 +21,11 @@ package org.apache.hadoop.hbase.backup.impl;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
-import java.util.Map;
+import java.util.Set;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -30,6 +33,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable.WALItem;
import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Admin;
@@ -60,16 +64,16 @@ public class IncrementalBackupManager extends BackupManager {
* @return The new HashMap of RS log time stamps after the log roll for this incremental backup.
* @throws IOException exception
*/
- public Map<String, Long> getIncrBackupLogFileMap() throws IOException {
+ public HashMap<String, Long> getIncrBackupLogFileMap() throws IOException {
List<String> logList;
- Map<String, Long> newTimestamps;
- Map<String, Long> previousTimestampMins;
+ HashMap<String, Long> newTimestamps;
+ HashMap<String, Long> previousTimestampMins;
String savedStartCode = readBackupStartCode();
// key: tableName
// value: <RegionServer,PreviousTimeStamp>
- Map<TableName, Map<String, Long>> previousTimestampMap = readLogTimestampMap();
+ HashMap<TableName, HashMap<String, Long>> previousTimestampMap = readLogTimestampMap();
previousTimestampMins = BackupUtils.getRSLogTimestampMins(previousTimestampMap);
@@ -95,19 +99,68 @@ public class IncrementalBackupManager extends BackupManager {
newTimestamps = readRegionServerLastLogRollResult();
logList = getLogFilesForNewBackup(previousTimestampMins, newTimestamps, conf, savedStartCode);
- logList = excludeProcV2WALs(logList);
+ List<WALItem> logFromSystemTable =
+ getLogFilesFromBackupSystem(previousTimestampMins, newTimestamps, getBackupInfo()
+ .getBackupRootDir());
+ logList = excludeAlreadyBackedUpAndProcV2WALs(logList, logFromSystemTable);
backupInfo.setIncrBackupFileList(logList);
return newTimestamps;
}
- private List<String> excludeProcV2WALs(List<String> logList) {
+ /**
+ * Get list of WAL files eligible for incremental backup.
+ *
+ * @return list of WAL files
+ * @throws IOException if getting the list of WAL files fails
+ */
+ public List<String> getIncrBackupLogFileList() throws IOException {
+ List<String> logList;
+ HashMap<String, Long> newTimestamps;
+ HashMap<String, Long> previousTimestampMins;
+
+ String savedStartCode = readBackupStartCode();
+
+ // key: tableName
+ // value: <RegionServer,PreviousTimeStamp>
+ HashMap<TableName, HashMap<String, Long>> previousTimestampMap = readLogTimestampMap();
+
+ previousTimestampMins = BackupUtils.getRSLogTimestampMins(previousTimestampMap);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("StartCode " + savedStartCode + "for backupID " + backupInfo.getBackupId());
+ }
+ // get all new log files from .logs and .oldlogs after last TS and before new timestamp
+ if (savedStartCode == null || previousTimestampMins == null
+ || previousTimestampMins.isEmpty()) {
+ throw new IOException(
+ "Cannot read any previous back up timestamps from backup system table. "
+ + "In order to create an incremental backup, at least one full backup is needed.");
+ }
+
+ newTimestamps = readRegionServerLastLogRollResult();
+
+ logList = getLogFilesForNewBackup(previousTimestampMins, newTimestamps, conf, savedStartCode);
+ List<WALItem> logFromSystemTable =
+ getLogFilesFromBackupSystem(previousTimestampMins, newTimestamps, getBackupInfo()
+ .getBackupRootDir());
+
+ logList = excludeAlreadyBackedUpAndProcV2WALs(logList, logFromSystemTable);
+ backupInfo.setIncrBackupFileList(logList);
+
+ return logList;
+ }
+
+ private List<String> excludeAlreadyBackedUpAndProcV2WALs(List<String> logList,
+ List<WALItem> logFromSystemTable) {
+ Set<String> walFileNameSet = convertToSet(logFromSystemTable);
+
List<String> list = new ArrayList<>();
for (int i=0; i < logList.size(); i++) {
Path p = new Path(logList.get(i));
String name = p.getName();
- if (name.startsWith(WALProcedureStore.LOG_PREFIX)) {
+ if (walFileNameSet.contains(name) || name.startsWith(WALProcedureStore.LOG_PREFIX)) {
continue;
}
@@ -117,6 +170,65 @@ public class IncrementalBackupManager extends BackupManager {
}
/**
+ * Create Set of WAL file names (not full path names)
+ * @param logFromSystemTable the logs from the system table to convert
+ * @return set of WAL file names
+ */
+ private Set<String> convertToSet(List<WALItem> logFromSystemTable) {
+ Set<String> set = new HashSet<>();
+ for (int i=0; i < logFromSystemTable.size(); i++) {
+ WALItem item = logFromSystemTable.get(i);
+ set.add((new Path(item.walFile)).getName());
+ }
+ return set;
+ }
+
+ /**
+ * For each region server: get all log files newer than the last timestamps, but not newer than
+ * the newest timestamps.
+ * @param olderTimestamps timestamp map for each region server of the last backup.
+ * @param newestTimestamps timestamp map for each region server that the backup should lead to.
+ * @return list of log files which needs to be added to this backup
+ * @throws IOException if getting the WAL files from the backup system fails
+ */
+ private List<WALItem> getLogFilesFromBackupSystem(HashMap<String, Long> olderTimestamps,
+ HashMap<String, Long> newestTimestamps, String backupRoot) throws IOException {
+ List<WALItem> logFiles = new ArrayList<>();
+ Iterator<WALItem> it = getWALFilesFromBackupSystem();
+ while (it.hasNext()) {
+ WALItem item = it.next();
+ String rootDir = item.getBackupRoot();
+ if (!rootDir.equals(backupRoot)) {
+ continue;
+ }
+ String walFileName = item.getWalFile();
+ String server = BackupUtils.parseHostNameFromLogFile(new Path(walFileName));
+ if (server == null) {
+ continue;
+ }
+ Long tss = getTimestamp(walFileName);
+ Long oldTss = olderTimestamps.get(server);
+ Long newTss = newestTimestamps.get(server);
+ if (oldTss == null) {
+ logFiles.add(item);
+ continue;
+ }
+ if (newTss == null) {
+ newTss = Long.MAX_VALUE;
+ }
+ if (tss > oldTss && tss < newTss) {
+ logFiles.add(item);
+ }
+ }
+ return logFiles;
+ }
+
+ private Long getTimestamp(String walFileName) {
+ int index = walFileName.lastIndexOf(BackupUtils.LOGNAME_SEPARATOR);
+ return Long.parseLong(walFileName.substring(index + 1));
+ }
+
+ /**
* For each region server: get all log files newer than the last timestamps but not newer than the
* newest timestamps.
* @param olderTimestamps the timestamp for each region server of the last backup.
@@ -126,8 +238,8 @@ public class IncrementalBackupManager extends BackupManager {
* @return a list of log files to be backed up
* @throws IOException exception
*/
- private List<String> getLogFilesForNewBackup(Map<String, Long> olderTimestamps,
- Map<String, Long> newestTimestamps, Configuration conf, String savedStartCode)
+ private List<String> getLogFilesForNewBackup(HashMap<String, Long> olderTimestamps,
+ HashMap<String, Long> newestTimestamps, Configuration conf, String savedStartCode)
throws IOException {
LOG.debug("In getLogFilesForNewBackup()\n" + "olderTimestamps: " + olderTimestamps
+ "\n newestTimestamps: " + newestTimestamps);
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
index 918e99a..1fc428d 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -287,6 +288,8 @@ public class IncrementalTableBackupClient extends TableBackupClient {
convertWALsToHFiles();
incrementalCopyHFiles(new String[] {getBulkOutputDir().toString()},
backupInfo.getBackupRootDir());
+ // Save list of WAL files copied
+ backupManager.recordWALFiles(backupInfo.getIncrBackupFileList());
} catch (Exception e) {
String msg = "Unexpected exception in incremental-backup: incremental copy " + backupId;
// fail the overall backup and return
@@ -298,7 +301,7 @@ public class IncrementalTableBackupClient extends TableBackupClient {
// After this checkpoint, even if entering cancel process, will let the backup finished
try {
// Set the previousTimestampMap which is before this current log roll to the manifest.
- Map<TableName, Map<String, Long>> previousTimestampMap =
+ HashMap<TableName, HashMap<String, Long>> previousTimestampMap =
backupManager.readLogTimestampMap();
backupInfo.setIncrTimestampMap(previousTimestampMap);
@@ -306,10 +309,9 @@ public class IncrementalTableBackupClient extends TableBackupClient {
// For incremental backup, it contains the incremental backup table set.
backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps);
- Map<TableName, Map<String, Long>> newTableSetTimestampMap =
+ HashMap<TableName, HashMap<String, Long>> newTableSetTimestampMap =
backupManager.readLogTimestampMap();
- backupInfo.setTableSetTimestampMap(newTableSetTimestampMap);
Long newStartCode =
BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap));
backupManager.writeBackupStartCode(newStartCode);
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java
index 57f3a50..0213414 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -62,7 +61,7 @@ public abstract class TableBackupClient {
protected Connection conn;
protected String backupId;
protected List<TableName> tableList;
- protected Map<String, Long> newTimestamps = null;
+ protected HashMap<String, Long> newTimestamps = null;
protected BackupManager backupManager;
protected BackupInfo backupInfo;
@@ -295,7 +294,7 @@ public abstract class TableBackupClient {
if (type == BackupType.INCREMENTAL) {
// We'll store the log timestamps for this table only in its manifest.
- Map<TableName, Map<String, Long>> tableTimestampMap = new HashMap<>();
+ HashMap<TableName, HashMap<String, Long>> tableTimestampMap = new HashMap<>();
tableTimestampMap.put(table, backupInfo.getIncrTimestampMap().get(table));
manifest.setIncrTimestampMap(tableTimestampMap);
ArrayList<BackupImage> ancestorss = backupManager.getAncestors(backupInfo);
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java
index 79404b3..8d9400f 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java
@@ -21,29 +21,26 @@ package org.apache.hadoop.hbase.backup.master;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.backup.BackupInfo;
+import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
import org.apache.hadoop.hbase.backup.impl.BackupManager;
-import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
-import org.apache.hadoop.hbase.net.Address;
-import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
-import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils;
+
import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
/**
@@ -79,77 +76,46 @@ public class BackupLogCleaner extends BaseLogCleanerDelegate {
}
}
-
- private Map<Address, Long> getServersToOldestBackupMapping(List<BackupInfo> backups)
- throws IOException {
- Map<Address, Long> serverAddressToLastBackupMap = new HashMap<>();
-
- Map<TableName, Long> tableNameBackupInfoMap = new HashMap<>();
- for (BackupInfo backupInfo : backups) {
- for (TableName table : backupInfo.getTables()) {
- tableNameBackupInfoMap.putIfAbsent(table, backupInfo.getStartTs());
- if (tableNameBackupInfoMap.get(table) <= backupInfo.getStartTs()) {
- tableNameBackupInfoMap.put(table, backupInfo.getStartTs());
- for (Map.Entry<String, Long> entry : backupInfo.getTableSetTimestampMap().get(table)
- .entrySet()) {
- serverAddressToLastBackupMap.put(Address.fromString(entry.getKey()), entry.getValue());
- }
- }
- }
- }
-
- return serverAddressToLastBackupMap;
- }
-
@Override
public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
- List<FileStatus> filteredFiles = new ArrayList<>();
-
// all members of this class are null if backup is disabled,
// so we cannot filter the files
if (this.getConf() == null || !BackupManager.isBackupEnabled(getConf())) {
LOG.debug("Backup is not enabled. Check your {} setting",
- BackupRestoreConstants.BACKUP_ENABLE_KEY);
+ BackupRestoreConstants.BACKUP_ENABLE_KEY);
return files;
}
- Map<Address, Long> addressToLastBackupMap;
- try {
- try (BackupManager backupManager = new BackupManager(conn, getConf())) {
- addressToLastBackupMap =
- getServersToOldestBackupMapping(backupManager.getBackupHistory(true));
- }
- } catch (IOException ex) {
- LOG.error("Failed to analyse backup history with exception: {}. Retaining all logs",
- ex.getMessage(), ex);
- return Collections.emptyList();
- }
- for (FileStatus file : files) {
- String fn = file.getPath().getName();
- if (fn.startsWith(WALProcedureStore.LOG_PREFIX)) {
- filteredFiles.add(file);
- continue;
- }
-
+ try (final BackupSystemTable table = new BackupSystemTable(conn)) {
+ // If we do not have recorded backup sessions
try {
- Address walServerAddress =
- Address.fromString(BackupUtils.parseHostNameFromLogFile(file.getPath()));
- long walTimestamp = AbstractFSWALProvider.getTimestamp(file.getPath().getName());
-
- if (!addressToLastBackupMap.containsKey(walServerAddress)
- || addressToLastBackupMap.get(walServerAddress) >= walTimestamp) {
- filteredFiles.add(file);
+ if (!table.hasBackupSessions()) {
+ LOG.trace("BackupLogCleaner has no backup sessions");
+ return files;
+ }
+ } catch (TableNotFoundException tnfe) {
+ LOG.warn("Backup system table is not available: {}", tnfe.getMessage());
+ return files;
+ }
+ List<FileStatus> list = new ArrayList<>();
+ Map<FileStatus, Boolean> walFilesDeletableMap = table.areWALFilesDeletable(files);
+ for (Map.Entry<FileStatus, Boolean> entry: walFilesDeletableMap.entrySet()) {
+ FileStatus file = entry.getKey();
+ String wal = file.getPath().toString();
+ boolean deletable = entry.getValue();
+ if (deletable) {
+ LOG.debug("Found log file in backup system table, deleting: {}", wal);
+ list.add(file);
+ } else {
+ LOG.debug("Did not find this log in backup system table, keeping: {}", wal);
}
- } catch (Exception ex) {
- LOG.warn(
- "Error occurred while filtering file: {} with error: {}. Ignoring cleanup of this log",
- file.getPath(), ex.getMessage());
}
+ return list;
+ } catch (IOException e) {
+ LOG.error("Failed to get backup system table table, therefore will keep all files", e);
+ // nothing to delete
+ return Collections.emptyList();
}
-
- LOG
- .info("Total files: {}, Filtered Files: {}", IterableUtils.size(files), filteredFiles.size());
- return filteredFiles;
}
@Override
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
index 90bb4fd..2f81bfc 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
@@ -27,7 +27,6 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
import java.util.TreeSet;
@@ -83,8 +82,8 @@ public final class BackupUtils {
* @param rsLogTimestampMap timestamp map
* @return the min timestamp of each RS
*/
- public static Map<String, Long> getRSLogTimestampMins(
- Map<TableName, Map<String, Long>> rsLogTimestampMap) {
+ public static HashMap<String, Long> getRSLogTimestampMins(
+ HashMap<TableName, HashMap<String, Long>> rsLogTimestampMap) {
if (rsLogTimestampMap == null || rsLogTimestampMap.isEmpty()) {
return null;
}
@@ -92,14 +91,18 @@ public final class BackupUtils {
HashMap<String, Long> rsLogTimestampMins = new HashMap<>();
HashMap<String, HashMap<TableName, Long>> rsLogTimestampMapByRS = new HashMap<>();
- for (Entry<TableName, Map<String, Long>> tableEntry : rsLogTimestampMap.entrySet()) {
+ for (Entry<TableName, HashMap<String, Long>> tableEntry : rsLogTimestampMap.entrySet()) {
TableName table = tableEntry.getKey();
- Map<String, Long> rsLogTimestamp = tableEntry.getValue();
+ HashMap<String, Long> rsLogTimestamp = tableEntry.getValue();
for (Entry<String, Long> rsEntry : rsLogTimestamp.entrySet()) {
String rs = rsEntry.getKey();
Long ts = rsEntry.getValue();
- rsLogTimestampMapByRS.putIfAbsent(rs, new HashMap<>());
- rsLogTimestampMapByRS.get(rs).put(table, ts);
+ if (!rsLogTimestampMapByRS.containsKey(rs)) {
+ rsLogTimestampMapByRS.put(rs, new HashMap<>());
+ rsLogTimestampMapByRS.get(rs).put(table, ts);
+ } else {
+ rsLogTimestampMapByRS.get(rs).put(table, ts);
+ }
}
}
@@ -346,7 +349,7 @@ public final class BackupUtils {
* @param map map
* @return the min value
*/
- public static <T> Long getMinValue(Map<T, Long> map) {
+ public static <T> Long getMinValue(HashMap<T, Long> map) {
Long minTimestamp = null;
if (map != null) {
ArrayList<Long> timestampList = new ArrayList<>(map.values());
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
index 8a06425..80ffe55 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
@@ -27,7 +27,6 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
@@ -65,7 +64,6 @@ import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -94,6 +92,7 @@ public class TestBackupBase {
protected static TableName table1_restore = TableName.valueOf("default:table1");
protected static TableName table2_restore = TableName.valueOf("ns2:table2");
protected static TableName table3_restore = TableName.valueOf("ns3:table3_restore");
+ protected static TableName table4_restore = TableName.valueOf("ns4:table4_restore");
protected static final int NB_ROWS_IN_BATCH = 99;
protected static final byte[] qualName = Bytes.toBytes("q1");
@@ -136,12 +135,14 @@ public class TestBackupBase {
incrementalCopyHFiles(new String[] {getBulkOutputDir().toString()},
backupInfo.getBackupRootDir());
failStageIf(Stage.stage_2);
+ // Save list of WAL files copied
+ backupManager.recordWALFiles(backupInfo.getIncrBackupFileList());
// case INCR_BACKUP_COMPLETE:
// set overall backup status: complete. Here we make sure to complete the backup.
// After this checkpoint, even if entering cancel process, will let the backup finished
// Set the previousTimestampMap which is before this current log roll to the manifest.
- Map<TableName, Map<String, Long>> previousTimestampMap =
+ HashMap<TableName, HashMap<String, Long>> previousTimestampMap =
backupManager.readLogTimestampMap();
backupInfo.setIncrTimestampMap(previousTimestampMap);
@@ -150,7 +151,7 @@ public class TestBackupBase {
backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps);
failStageIf(Stage.stage_3);
- Map<TableName, Map<String, Long>> newTableSetTimestampMap =
+ HashMap<TableName, HashMap<String, Long>> newTableSetTimestampMap =
backupManager.readLogTimestampMap();
Long newStartCode =
@@ -211,6 +212,14 @@ public class TestBackupBase {
LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props);
failStageIf(Stage.stage_2);
newTimestamps = backupManager.readRegionServerLastLogRollResult();
+ if (firstBackup) {
+ // Updates registered log files
+ // We record ALL old WAL files as registered, because
+ // this is a first full backup in the system and these
+ // files are not needed for next incremental backup
+ List<String> logFiles = BackupUtils.getWALFilesOlderThan(conf, newTimestamps);
+ backupManager.recordWALFiles(logFiles);
+ }
// SNAPSHOT_TABLES:
backupInfo.setPhase(BackupPhase.SNAPSHOT);
@@ -238,7 +247,7 @@ public class TestBackupBase {
// For incremental backup, it contains the incremental backup table set.
backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps);
- Map<TableName, Map<String, Long>> newTableSetTimestampMap =
+ HashMap<TableName, HashMap<String, Long>> newTableSetTimestampMap =
backupManager.readLogTimestampMap();
Long newStartCode =
@@ -421,10 +430,15 @@ public class TestBackupBase {
Admin ha = TEST_UTIL.getAdmin();
// Create namespaces
- ha.createNamespace(NamespaceDescriptor.create("ns1").build());
- ha.createNamespace(NamespaceDescriptor.create("ns2").build());
- ha.createNamespace(NamespaceDescriptor.create("ns3").build());
- ha.createNamespace(NamespaceDescriptor.create("ns4").build());
+ NamespaceDescriptor desc1 = NamespaceDescriptor.create("ns1").build();
+ NamespaceDescriptor desc2 = NamespaceDescriptor.create("ns2").build();
+ NamespaceDescriptor desc3 = NamespaceDescriptor.create("ns3").build();
+ NamespaceDescriptor desc4 = NamespaceDescriptor.create("ns4").build();
+
+ ha.createNamespace(desc1);
+ ha.createNamespace(desc2);
+ ha.createNamespace(desc3);
+ ha.createNamespace(desc4);
TableDescriptor desc = TableDescriptorBuilder.newBuilder(table1)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(famName)).build();
@@ -493,21 +507,6 @@ public class TestBackupBase {
return ret;
}
- protected List<FileStatus> getListOfWALFiles(Configuration c) throws IOException {
- Path logRoot = new Path(CommonFSUtils.getWALRootDir(c), HConstants.HREGION_LOGDIR_NAME);
- FileSystem fs = logRoot.getFileSystem(c);
- RemoteIterator<LocatedFileStatus> it = fs.listFiles(logRoot, true);
- List<FileStatus> logFiles = new ArrayList<FileStatus>();
- while (it.hasNext()) {
- LocatedFileStatus lfs = it.next();
- if (lfs.isFile() && !AbstractFSWALProvider.isMetaFile(lfs.getPath())) {
- logFiles.add(lfs);
- LOG.info(Objects.toString(lfs));
- }
- }
- return logFiles;
- }
-
protected void dumpBackupDir() throws IOException {
// Dump Backup Dir
FileSystem fs = FileSystem.get(conf1);
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupSystemTable.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupSystemTable.java
index e5a6679..b01a136 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupSystemTable.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupSystemTable.java
@@ -18,9 +18,11 @@
package org.apache.hadoop.hbase.backup;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -33,6 +35,8 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
@@ -266,12 +270,12 @@ public class TestBackupSystemTable {
table.writeRegionServerLogTimestamp(tables, rsTimestampMap, "root");
- Map<TableName, Map<String, Long>> result = table.readLogTimestampMap("root");
+ HashMap<TableName, HashMap<String, Long>> result = table.readLogTimestampMap("root");
assertTrue(tables.size() == result.size());
for (TableName t : tables) {
- Map<String, Long> rstm = result.get(t);
+ HashMap<String, Long> rstm = result.get(t);
assertNotNull(rstm);
assertEquals(rstm.get("rs1:100"), new Long(100L));
assertEquals(rstm.get("rs2:100"), new Long(101L));
@@ -297,7 +301,7 @@ public class TestBackupSystemTable {
assertTrue(5 == result.size());
for (TableName t : tables) {
- Map<String, Long> rstm = result.get(t);
+ HashMap<String, Long> rstm = result.get(t);
assertNotNull(rstm);
if (t.equals(TableName.valueOf("t3")) == false) {
assertEquals(rstm.get("rs1:100"), new Long(100L));
@@ -311,7 +315,7 @@ public class TestBackupSystemTable {
}
for (TableName t : tables1) {
- Map<String, Long> rstm = result.get(t);
+ HashMap<String, Long> rstm = result.get(t);
assertNotNull(rstm);
assertEquals(rstm.get("rs1:100"), new Long(200L));
assertEquals(rstm.get("rs2:100"), new Long(201L));
@@ -322,6 +326,43 @@ public class TestBackupSystemTable {
}
+ @Test
+ public void testAddWALFiles() throws IOException {
+ List<String> files =
+ Arrays.asList("hdfs://server/WALs/srv1,101,15555/srv1,101,15555.default.1",
+ "hdfs://server/WALs/srv2,102,16666/srv2,102,16666.default.2",
+ "hdfs://server/WALs/srv3,103,17777/srv3,103,17777.default.3");
+ String newFile = "hdfs://server/WALs/srv1,101,15555/srv1,101,15555.default.5";
+
+ table.addWALFiles(files, "backup", "root");
+
+ assertTrue(table.isWALFileDeletable(files.get(0)));
+ assertTrue(table.isWALFileDeletable(files.get(1)));
+ assertTrue(table.isWALFileDeletable(files.get(2)));
+ assertFalse(table.isWALFileDeletable(newFile));
+
+ // test for isWALFilesDeletable
+ List<FileStatus> fileStatues = new ArrayList<>();
+ for (String file : files) {
+ FileStatus fileStatus = new FileStatus();
+ fileStatus.setPath(new Path(file));
+ fileStatues.add(fileStatus);
+ }
+
+ FileStatus newFileStatus = new FileStatus();
+ newFileStatus.setPath(new Path(newFile));
+ fileStatues.add(newFileStatus);
+
+ Map<FileStatus, Boolean> walFilesDeletableMap = table.areWALFilesDeletable(fileStatues);
+
+ assertTrue(walFilesDeletableMap.get(fileStatues.get(0)));
+ assertTrue(walFilesDeletableMap.get(fileStatues.get(1)));
+ assertTrue(walFilesDeletableMap.get(fileStatues.get(2)));
+ assertFalse(walFilesDeletableMap.get(newFileStatus));
+
+ cleanBackupTable();
+ }
+
/**
* Backup set tests
*/
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/master/TestBackupLogCleaner.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/master/TestBackupLogCleaner.java
index 5363b1a..6b8011e 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/master/TestBackupLogCleaner.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/master/TestBackupLogCleaner.java
@@ -19,11 +19,19 @@ package org.apache.hadoop.hbase.backup.master;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import java.util.HashMap;
+
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
+import java.util.Objects;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.BackupType;
import org.apache.hadoop.hbase.backup.TestBackupBase;
@@ -32,14 +40,16 @@ import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
@@ -54,7 +64,6 @@ public class TestBackupLogCleaner extends TestBackupBase {
// implements all test cases in 1 test since incremental full backup/
// incremental backup has dependencies
-
@Test
public void testBackupLogCleaner() throws Exception {
@@ -68,11 +77,10 @@ public class TestBackupLogCleaner extends TestBackupBase {
assertFalse(systemTable.hasBackupSessions());
List<FileStatus> walFiles = getListOfWALFiles(TEST_UTIL.getConfiguration());
+ List<String> swalFiles = convert(walFiles);
BackupLogCleaner cleaner = new BackupLogCleaner();
cleaner.setConf(TEST_UTIL.getConfiguration());
- Map<String, Object> params = new HashMap<>();
- params.put(HMaster.MASTER, TEST_UTIL.getHBaseCluster().getMaster());
- cleaner.init(params);
+ cleaner.init(null);
cleaner.setConf(TEST_UTIL.getConfiguration());
Iterable<FileStatus> deletable = cleaner.getDeletableFiles(walFiles);
@@ -81,6 +89,7 @@ public class TestBackupLogCleaner extends TestBackupBase {
// We can delete all files because we do not have yet recorded backup sessions
assertTrue(size == walFiles.size());
+ systemTable.addWALFiles(swalFiles, "backup", "root");
String backupIdFull = fullTableBackup(tableSetFullList);
assertTrue(checkSucceeded(backupIdFull));
// Check one more time
@@ -91,6 +100,7 @@ public class TestBackupLogCleaner extends TestBackupBase {
List<FileStatus> newWalFiles = getListOfWALFiles(TEST_UTIL.getConfiguration());
LOG.debug("WAL list after full backup");
+ convert(newWalFiles);
// New list of wal files is greater than the previous one,
// because new wal per RS have been opened after full backup
@@ -130,4 +140,29 @@ public class TestBackupLogCleaner extends TestBackupBase {
conn.close();
}
}
+
+ private List<String> convert(List<FileStatus> walFiles) {
+ List<String> result = new ArrayList<String>();
+ for (FileStatus fs : walFiles) {
+ LOG.debug("+++WAL: " + fs.getPath().toString());
+ result.add(fs.getPath().toString());
+ }
+ return result;
+ }
+
+ private List<FileStatus> getListOfWALFiles(Configuration c) throws IOException {
+ Path logRoot = new Path(CommonFSUtils.getWALRootDir(c), HConstants.HREGION_LOGDIR_NAME);
+ FileSystem fs = logRoot.getFileSystem(c);
+ RemoteIterator<LocatedFileStatus> it = fs.listFiles(logRoot, true);
+ List<FileStatus> logFiles = new ArrayList<FileStatus>();
+ while (it.hasNext()) {
+ LocatedFileStatus lfs = it.next();
+ if (lfs.isFile() && !AbstractFSWALProvider.isMetaFile(lfs.getPath())) {
+ logFiles.add(lfs);
+ LOG.info(Objects.toString(lfs));
+ }
+ }
+ return logFiles;
+ }
+
}
diff --git a/hbase-protocol-shaded/src/main/protobuf/Backup.proto b/hbase-protocol-shaded/src/main/protobuf/Backup.proto
index afe4312..6084e17 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Backup.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Backup.proto
@@ -91,11 +91,7 @@ message BackupInfo {
optional uint32 progress = 10;
optional uint32 workers_number = 11;
optional uint64 bandwidth = 12;
- map<string, RSTimestampMap> table_set_timestamp = 13;
- message RSTimestampMap {
- map<string, uint64> rs_timestamp = 1;
- }
/**
* Backup session states
*/