You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2017/03/10 23:37:52 UTC
[06/10] hbase git commit: HBASE-14123 HBase Backup/Restore Phase 2
(Vladimir Rodionov)
http://git-wip-us.apache.org/repos/asf/hbase/blob/3aaea8e0/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java
new file mode 100644
index 0000000..b8adac9
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java
@@ -0,0 +1,666 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupInfo;
+import org.apache.hadoop.hbase.backup.BackupType;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+
+/**
+ * Backup manifest contains all the meta data of a backup image. The manifest info will be bundled
+ * as manifest file together with data. So that each backup image will contain all the info needed
+ * for restore. BackupManifest is a storage container for BackupImage.
+ * It is responsible for storing/reading backup image data and has some additional utility methods.
+ *
+ */
+@InterfaceAudience.Private
+public class BackupManifest {
+
+ private static final Log LOG = LogFactory.getLog(BackupManifest.class);
+
+ // manifest file name
+ public static final String MANIFEST_FILE_NAME = ".backup.manifest";
+
+ /**
+ * Backup image, the dependency graph is made up by series of backup images
+ * BackupImage contains all the relevant information to restore the backup and
+ * is used during restore operation
+ */
+
+ public static class BackupImage implements Comparable<BackupImage> {
+
+ static class Builder {
+ BackupImage image;
+
+ Builder() {
+ image = new BackupImage();
+ }
+
+ Builder withBackupId(String backupId) {
+ image.setBackupId(backupId);
+ return this;
+ }
+
+ Builder withType(BackupType type) {
+ image.setType(type);
+ return this;
+ }
+
+ Builder withRootDir(String rootDir) {
+ image.setRootDir(rootDir);
+ return this;
+ }
+
+ Builder withTableList(List<TableName> tableList) {
+ image.setTableList(tableList);
+ return this;
+ }
+
+ Builder withStartTime(long startTime) {
+ image.setStartTs(startTime);
+ return this;
+ }
+
+ Builder withCompleteTime(long completeTime) {
+ image.setCompleteTs(completeTime);
+ return this;
+ }
+
+ BackupImage build() {
+ return image;
+ }
+
+ }
+
+ private String backupId;
+ private BackupType type;
+ private String rootDir;
+ private List<TableName> tableList;
+ private long startTs;
+ private long completeTs;
+ private ArrayList<BackupImage> ancestors;
+ private HashMap<TableName, HashMap<String, Long>> incrTimeRanges;
+
+ static Builder newBuilder() {
+ return new Builder();
+ }
+
+ public BackupImage() {
+ super();
+ }
+
+ private BackupImage(String backupId, BackupType type, String rootDir,
+ List<TableName> tableList, long startTs, long completeTs) {
+ this.backupId = backupId;
+ this.type = type;
+ this.rootDir = rootDir;
+ this.tableList = tableList;
+ this.startTs = startTs;
+ this.completeTs = completeTs;
+ }
+
+ static BackupImage fromProto(BackupProtos.BackupImage im) {
+ String backupId = im.getBackupId();
+ String rootDir = im.getBackupRootDir();
+ long startTs = im.getStartTs();
+ long completeTs = im.getCompleteTs();
+ List<HBaseProtos.TableName> tableListList = im.getTableListList();
+ List<TableName> tableList = new ArrayList<TableName>();
+ for (HBaseProtos.TableName tn : tableListList) {
+ tableList.add(ProtobufUtil.toTableName(tn));
+ }
+
+ List<BackupProtos.BackupImage> ancestorList = im.getAncestorsList();
+
+ BackupType type =
+ im.getBackupType() == BackupProtos.BackupType.FULL ? BackupType.FULL
+ : BackupType.INCREMENTAL;
+
+ BackupImage image = new BackupImage(backupId, type, rootDir, tableList, startTs, completeTs);
+ for (BackupProtos.BackupImage img : ancestorList) {
+ image.addAncestor(fromProto(img));
+ }
+ image.setIncrTimeRanges(loadIncrementalTimestampMap(im));
+ return image;
+ }
+
+ BackupProtos.BackupImage toProto() {
+ BackupProtos.BackupImage.Builder builder = BackupProtos.BackupImage.newBuilder();
+ builder.setBackupId(backupId);
+ builder.setCompleteTs(completeTs);
+ builder.setStartTs(startTs);
+ builder.setBackupRootDir(rootDir);
+ if (type == BackupType.FULL) {
+ builder.setBackupType(BackupProtos.BackupType.FULL);
+ } else {
+ builder.setBackupType(BackupProtos.BackupType.INCREMENTAL);
+ }
+
+ for (TableName name : tableList) {
+ builder.addTableList(ProtobufUtil.toProtoTableName(name));
+ }
+
+ if (ancestors != null) {
+ for (BackupImage im : ancestors) {
+ builder.addAncestors(im.toProto());
+ }
+ }
+
+ setIncrementalTimestampMap(builder);
+ return builder.build();
+ }
+
+ private static HashMap<TableName, HashMap<String, Long>> loadIncrementalTimestampMap(
+ BackupProtos.BackupImage proto) {
+ List<BackupProtos.TableServerTimestamp> list = proto.getTstMapList();
+
+ HashMap<TableName, HashMap<String, Long>> incrTimeRanges =
+ new HashMap<TableName, HashMap<String, Long>>();
+ if (list == null || list.size() == 0) return incrTimeRanges;
+ for (BackupProtos.TableServerTimestamp tst : list) {
+ TableName tn = ProtobufUtil.toTableName(tst.getTableName());
+ HashMap<String, Long> map = incrTimeRanges.get(tn);
+ if (map == null) {
+ map = new HashMap<String, Long>();
+ incrTimeRanges.put(tn, map);
+ }
+ List<BackupProtos.ServerTimestamp> listSt = tst.getServerTimestampList();
+ for (BackupProtos.ServerTimestamp stm : listSt) {
+ ServerName sn = ProtobufUtil.toServerName(stm.getServerName());
+ map.put(sn.getHostname() + ":" + sn.getPort(), stm.getTimestamp());
+ }
+ }
+ return incrTimeRanges;
+ }
+
+ private void setIncrementalTimestampMap(BackupProtos.BackupImage.Builder builder) {
+ if (this.incrTimeRanges == null) {
+ return;
+ }
+ for (Entry<TableName, HashMap<String, Long>> entry : this.incrTimeRanges.entrySet()) {
+ TableName key = entry.getKey();
+ HashMap<String, Long> value = entry.getValue();
+ BackupProtos.TableServerTimestamp.Builder tstBuilder =
+ BackupProtos.TableServerTimestamp.newBuilder();
+ tstBuilder.setTableName(ProtobufUtil.toProtoTableName(key));
+
+ for (Map.Entry<String, Long> entry2 : value.entrySet()) {
+ String s = entry2.getKey();
+ BackupProtos.ServerTimestamp.Builder stBuilder =
+ BackupProtos.ServerTimestamp.newBuilder();
+ HBaseProtos.ServerName.Builder snBuilder = HBaseProtos.ServerName.newBuilder();
+ ServerName sn = ServerName.parseServerName(s);
+ snBuilder.setHostName(sn.getHostname());
+ snBuilder.setPort(sn.getPort());
+ stBuilder.setServerName(snBuilder.build());
+ stBuilder.setTimestamp(entry2.getValue());
+ tstBuilder.addServerTimestamp(stBuilder.build());
+ }
+ builder.addTstMap(tstBuilder.build());
+ }
+ }
+
+ public String getBackupId() {
+ return backupId;
+ }
+
+ private void setBackupId(String backupId) {
+ this.backupId = backupId;
+ }
+
+ public BackupType getType() {
+ return type;
+ }
+
+ private void setType(BackupType type) {
+ this.type = type;
+ }
+
+ public String getRootDir() {
+ return rootDir;
+ }
+
+ private void setRootDir(String rootDir) {
+ this.rootDir = rootDir;
+ }
+
+ public List<TableName> getTableNames() {
+ return tableList;
+ }
+
+ private void setTableList(List<TableName> tableList) {
+ this.tableList = tableList;
+ }
+
+ public long getStartTs() {
+ return startTs;
+ }
+
+ private void setStartTs(long startTs) {
+ this.startTs = startTs;
+ }
+
+ public long getCompleteTs() {
+ return completeTs;
+ }
+
+ private void setCompleteTs(long completeTs) {
+ this.completeTs = completeTs;
+ }
+
+ public ArrayList<BackupImage> getAncestors() {
+ if (this.ancestors == null) {
+ this.ancestors = new ArrayList<BackupImage>();
+ }
+ return this.ancestors;
+ }
+
+ private void addAncestor(BackupImage backupImage) {
+ this.getAncestors().add(backupImage);
+ }
+
+ public boolean hasAncestor(String token) {
+ for (BackupImage image : this.getAncestors()) {
+ if (image.getBackupId().equals(token)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public boolean hasTable(TableName table) {
+ return tableList.contains(table);
+ }
+
+ @Override
+ public int compareTo(BackupImage other) {
+ String thisBackupId = this.getBackupId();
+ String otherBackupId = other.getBackupId();
+ int index1 = thisBackupId.lastIndexOf("_");
+ int index2 = otherBackupId.lastIndexOf("_");
+ String name1 = thisBackupId.substring(0, index1);
+ String name2 = otherBackupId.substring(0, index2);
+ if (name1.equals(name2)) {
+ Long thisTS = Long.valueOf(thisBackupId.substring(index1 + 1));
+ Long otherTS = Long.valueOf(otherBackupId.substring(index2 + 1));
+ return thisTS.compareTo(otherTS);
+ } else {
+ return name1.compareTo(name2);
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof BackupImage) {
+ return this.compareTo((BackupImage) obj) == 0;
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ int hash = 33 * this.getBackupId().hashCode() + type.hashCode();
+ hash = 33 * hash + rootDir.hashCode();
+ hash = 33 * hash + Long.valueOf(startTs).hashCode();
+ hash = 33 * hash + Long.valueOf(completeTs).hashCode();
+ for (TableName table : tableList) {
+ hash = 33 * hash + table.hashCode();
+ }
+ return hash;
+ }
+
+ public HashMap<TableName, HashMap<String, Long>> getIncrTimeRanges() {
+ return incrTimeRanges;
+ }
+
+ private void setIncrTimeRanges(HashMap<TableName, HashMap<String, Long>> incrTimeRanges) {
+ this.incrTimeRanges = incrTimeRanges;
+ }
+ }
+
+ // backup image directory
+ private String tableBackupDir = null;
+ private BackupImage backupImage;
+
+ /**
+ * Construct manifest for a ongoing backup.
+ * @param backup The ongoing backup info
+ */
+ public BackupManifest(BackupInfo backup) {
+
+ BackupImage.Builder builder = BackupImage.newBuilder();
+ this.backupImage =
+ builder.withBackupId(backup.getBackupId()).withType(backup.getType())
+ .withRootDir(backup.getBackupRootDir()).withTableList(backup.getTableNames())
+ .withStartTime(backup.getStartTs()).withCompleteTime(backup.getCompleteTs()).build();
+ }
+
+ /**
+ * Construct a table level manifest for a backup of the named table.
+ * @param backup The ongoing backup session info
+ */
+ public BackupManifest(BackupInfo backup, TableName table) {
+ this.tableBackupDir = backup.getTableBackupDir(table);
+ List<TableName> tables = new ArrayList<TableName>();
+ tables.add(table);
+ BackupImage.Builder builder = BackupImage.newBuilder();
+ this.backupImage =
+ builder.withBackupId(backup.getBackupId()).withType(backup.getType())
+ .withRootDir(backup.getBackupRootDir()).withTableList(tables)
+ .withStartTime(backup.getStartTs()).withCompleteTime(backup.getCompleteTs()).build();
+ }
+
+ /**
+ * Construct manifest from a backup directory.
+ * @param conf configuration
+ * @param backupPath backup path
+ * @throws IOException
+ */
+
+ public BackupManifest(Configuration conf, Path backupPath) throws IOException {
+ this(backupPath.getFileSystem(conf), backupPath);
+ }
+
+ /**
+ * Construct manifest from a backup directory.
+ * @param fs the FileSystem
+ * @param backupPath backup path
+ * @throws BackupException exception
+ */
+
+ public BackupManifest(FileSystem fs, Path backupPath) throws BackupException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Loading manifest from: " + backupPath.toString());
+ }
+ // The input backupDir may not exactly be the backup table dir.
+ // It could be the backup log dir where there is also a manifest file stored.
+ // This variable's purpose is to keep the correct and original location so
+ // that we can store/persist it.
+ try {
+
+ FileStatus[] subFiles = BackupUtils.listStatus(fs, backupPath, null);
+ if (subFiles == null) {
+ String errorMsg = backupPath.toString() + " does not exist";
+ LOG.error(errorMsg);
+ throw new IOException(errorMsg);
+ }
+ for (FileStatus subFile : subFiles) {
+ if (subFile.getPath().getName().equals(MANIFEST_FILE_NAME)) {
+
+ // load and set manifest field from file content
+ FSDataInputStream in = fs.open(subFile.getPath());
+ long len = subFile.getLen();
+ byte[] pbBytes = new byte[(int) len];
+ in.readFully(pbBytes);
+ BackupProtos.BackupImage proto = null;
+ try {
+ proto = BackupProtos.BackupImage.parseFrom(pbBytes);
+ } catch (Exception e) {
+ throw new BackupException(e);
+ }
+ this.backupImage = BackupImage.fromProto(proto);
+ LOG.debug("Loaded manifest instance from manifest file: "
+ + BackupUtils.getPath(subFile.getPath()));
+ return;
+ }
+ }
+ String errorMsg = "No manifest file found in: " + backupPath.toString();
+ throw new IOException(errorMsg);
+
+ } catch (IOException e) {
+ throw new BackupException(e.getMessage());
+ }
+ }
+
+ public BackupType getType() {
+ return backupImage.getType();
+ }
+
+ /**
+ * Get the table set of this image.
+ * @return The table set list
+ */
+ public List<TableName> getTableList() {
+ return backupImage.getTableNames();
+ }
+
+ /**
+ * Persist the manifest file.
+ * @throws IOException IOException when storing the manifest file.
+ */
+
+ public void store(Configuration conf) throws BackupException {
+ byte[] data = backupImage.toProto().toByteArray();
+ // write the file, overwrite if already exist
+ String logBackupDir =
+ BackupUtils.getLogBackupDir(backupImage.getRootDir(), backupImage.getBackupId());
+ Path manifestFilePath =
+ new Path(new Path((tableBackupDir != null ? tableBackupDir : logBackupDir)),
+ MANIFEST_FILE_NAME);
+ try (FSDataOutputStream out =
+ manifestFilePath.getFileSystem(conf).create(manifestFilePath, true);) {
+ out.write(data);
+ } catch (IOException e) {
+ throw new BackupException(e.getMessage());
+ }
+
+ LOG.info("Manifest file stored to " + manifestFilePath);
+ }
+
+ /**
+ * Get this backup image.
+ * @return the backup image.
+ */
+ public BackupImage getBackupImage() {
+ return backupImage;
+ }
+
+ /**
+ * Add dependent backup image for this backup.
+ * @param image The direct dependent backup image
+ */
+ public void addDependentImage(BackupImage image) {
+ this.backupImage.addAncestor(image);
+ }
+
+ /**
+ * Set the incremental timestamp map directly.
+ * @param incrTimestampMap timestamp map
+ */
+ public void setIncrTimestampMap(HashMap<TableName, HashMap<String, Long>> incrTimestampMap) {
+ this.backupImage.setIncrTimeRanges(incrTimestampMap);
+ }
+
+ public Map<TableName, HashMap<String, Long>> getIncrTimestampMap() {
+ return backupImage.getIncrTimeRanges();
+ }
+
+ /**
+ * Get the image list of this backup for restore in time order.
+ * @param reverse If true, then output in reverse order, otherwise in time order from old to new
+ * @return the backup image list for restore in time order
+ */
+ public ArrayList<BackupImage> getRestoreDependentList(boolean reverse) {
+ TreeMap<Long, BackupImage> restoreImages = new TreeMap<Long, BackupImage>();
+ restoreImages.put(backupImage.startTs, backupImage);
+ for (BackupImage image : backupImage.getAncestors()) {
+ restoreImages.put(Long.valueOf(image.startTs), image);
+ }
+ return new ArrayList<BackupImage>(reverse ? (restoreImages.descendingMap().values())
+ : (restoreImages.values()));
+ }
+
+ /**
+ * Get the dependent image list for a specific table of this backup in time order from old to new
+ * if want to restore to this backup image level.
+ * @param table table
+ * @return the backup image list for a table in time order
+ */
+ public ArrayList<BackupImage> getDependentListByTable(TableName table) {
+ ArrayList<BackupImage> tableImageList = new ArrayList<BackupImage>();
+ ArrayList<BackupImage> imageList = getRestoreDependentList(true);
+ for (BackupImage image : imageList) {
+ if (image.hasTable(table)) {
+ tableImageList.add(image);
+ if (image.getType() == BackupType.FULL) {
+ break;
+ }
+ }
+ }
+ Collections.reverse(tableImageList);
+ return tableImageList;
+ }
+
+ /**
+ * Get the full dependent image list in the whole dependency scope for a specific table of this
+ * backup in time order from old to new.
+ * @param table table
+ * @return the full backup image list for a table in time order in the whole scope of the
+ * dependency of this image
+ */
+ public ArrayList<BackupImage> getAllDependentListByTable(TableName table) {
+ ArrayList<BackupImage> tableImageList = new ArrayList<BackupImage>();
+ ArrayList<BackupImage> imageList = getRestoreDependentList(false);
+ for (BackupImage image : imageList) {
+ if (image.hasTable(table)) {
+ tableImageList.add(image);
+ }
+ }
+ return tableImageList;
+ }
+
+ /**
+ * Check whether backup image1 could cover backup image2 or not.
+ * @param image1 backup image 1
+ * @param image2 backup image 2
+ * @return true if image1 can cover image2, otherwise false
+ */
+ public static boolean canCoverImage(BackupImage image1, BackupImage image2) {
+ // image1 can cover image2 only when the following conditions are satisfied:
+ // - image1 must not be an incremental image;
+ // - image1 must be taken after image2 has been taken;
+ // - table set of image1 must cover the table set of image2.
+ if (image1.getType() == BackupType.INCREMENTAL) {
+ return false;
+ }
+ if (image1.getStartTs() < image2.getStartTs()) {
+ return false;
+ }
+ List<TableName> image1TableList = image1.getTableNames();
+ List<TableName> image2TableList = image2.getTableNames();
+ boolean found = false;
+ for (int i = 0; i < image2TableList.size(); i++) {
+ found = false;
+ for (int j = 0; j < image1TableList.size(); j++) {
+ if (image2TableList.get(i).equals(image1TableList.get(j))) {
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ return false;
+ }
+ }
+
+ LOG.debug("Backup image " + image1.getBackupId() + " can cover " + image2.getBackupId());
+ return true;
+ }
+
+ /**
+ * Check whether backup image set could cover a backup image or not.
+ * @param fullImages The backup image set
+ * @param image The target backup image
+ * @return true if fullImages can cover image, otherwise false
+ */
+ public static boolean canCoverImage(ArrayList<BackupImage> fullImages, BackupImage image) {
+ // fullImages can cover image only when the following conditions are satisfied:
+ // - each image of fullImages must not be an incremental image;
+ // - each image of fullImages must be taken after image has been taken;
+ // - sum table set of fullImages must cover the table set of image.
+ for (BackupImage image1 : fullImages) {
+ if (image1.getType() == BackupType.INCREMENTAL) {
+ return false;
+ }
+ if (image1.getStartTs() < image.getStartTs()) {
+ return false;
+ }
+ }
+
+ ArrayList<String> image1TableList = new ArrayList<String>();
+ for (BackupImage image1 : fullImages) {
+ List<TableName> tableList = image1.getTableNames();
+ for (TableName table : tableList) {
+ image1TableList.add(table.getNameAsString());
+ }
+ }
+ ArrayList<String> image2TableList = new ArrayList<String>();
+ List<TableName> tableList = image.getTableNames();
+ for (TableName table : tableList) {
+ image2TableList.add(table.getNameAsString());
+ }
+
+ for (int i = 0; i < image2TableList.size(); i++) {
+ if (image1TableList.contains(image2TableList.get(i)) == false) {
+ return false;
+ }
+ }
+
+ LOG.debug("Full image set can cover image " + image.getBackupId());
+ return true;
+ }
+
+ public BackupInfo toBackupInfo() {
+ BackupInfo info = new BackupInfo();
+ info.setType(backupImage.getType());
+ List<TableName> list = backupImage.getTableNames();
+ TableName[] tables = new TableName[list.size()];
+ info.addTables(list.toArray(tables));
+ info.setBackupId(backupImage.getBackupId());
+ info.setStartTs(backupImage.getStartTs());
+ info.setBackupRootDir(backupImage.getRootDir());
+ if (backupImage.getType() == BackupType.INCREMENTAL) {
+ info.setHLogTargetDir(BackupUtils.getLogBackupDir(backupImage.getRootDir(),
+ backupImage.getBackupId()));
+ }
+ return info;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3aaea8e0/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
new file mode 100644
index 0000000..6362f8e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
@@ -0,0 +1,1376 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.impl;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupInfo;
+import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
+import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+/**
+ * This class provides API to access backup system table<br>
+ *
+ * Backup system table schema:<br>
+ * <p><ul>
+ * <li>1. Backup sessions rowkey= "session:"+backupId; value =serialized BackupInfo</li>
+ * <li>2. Backup start code rowkey = "startcode:"+backupRoot; value = startcode</li>
+ * <li>3. Incremental backup set rowkey="incrbackupset:"+backupRoot; value=[list of tables]</li>
+ * <li>4. Table-RS-timestamp map rowkey="trslm:"+backupRoot+table_name;
+ * value = map[RS-> last WAL timestamp]</li>
+ * <li>5. RS - WAL ts map rowkey="rslogts:"+backupRoot +server; value = last WAL timestamp</li>
+ * <li>6. WALs recorded rowkey="wals:"+WAL unique file name;
+ * value = backupId and full WAL file name</li>
+ * </ul></p>
+ */
+@InterfaceAudience.Private
+public final class BackupSystemTable implements Closeable {
+
+ static class WALItem {
+ String backupId;
+ String walFile;
+ String backupRoot;
+
+ WALItem(String backupId, String walFile, String backupRoot) {
+ this.backupId = backupId;
+ this.walFile = walFile;
+ this.backupRoot = backupRoot;
+ }
+
+ public String getBackupId() {
+ return backupId;
+ }
+
+ public String getWalFile() {
+ return walFile;
+ }
+
+ public String getBackupRoot() {
+ return backupRoot;
+ }
+
+ @Override
+ public String toString() {
+ return Path.SEPARATOR + backupRoot + Path.SEPARATOR + backupId + Path.SEPARATOR + walFile;
+ }
+
+ }
+
+ private static final Log LOG = LogFactory.getLog(BackupSystemTable.class);
+
+ private TableName tableName;
+ /**
+ * Stores backup sessions (contexts)
+ */
+ final static byte[] SESSIONS_FAMILY = "session".getBytes();
+ /**
+ * Stores other meta
+ */
+ final static byte[] META_FAMILY = "meta".getBytes();
+ /**
+ * Connection to HBase cluster, shared among all instances
+ */
+ private final Connection connection;
+
+
+ private final static String BACKUP_INFO_PREFIX = "session:";
+ private final static String START_CODE_ROW = "startcode:";
+ private final static String INCR_BACKUP_SET = "incrbackupset:";
+ private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:";
+ private final static String RS_LOG_TS_PREFIX = "rslogts:";
+ private final static String WALS_PREFIX = "wals:";
+ private final static String SET_KEY_PREFIX = "backupset:";
+
+ private final static byte[] EMPTY_VALUE = new byte[] {};
+
+ // Safe delimiter in a string
+ private final static String NULL = "\u0000";
+
+ public BackupSystemTable(Connection conn) throws IOException {
+ this.connection = conn;
+ tableName = BackupSystemTable.getTableName(conn.getConfiguration());
+ checkSystemTable();
+ }
+
+ private void checkSystemTable() throws IOException {
+ try (Admin admin = connection.getAdmin();) {
+
+ if (!admin.tableExists(tableName)) {
+ HTableDescriptor backupHTD =
+ BackupSystemTable.getSystemTableDescriptor(connection.getConfiguration());
+ admin.createTable(backupHTD);
+ }
+ waitForSystemTable(admin);
+ }
+ }
+
+ private void waitForSystemTable(Admin admin) throws IOException {
+ long TIMEOUT = 60000;
+ long startTime = EnvironmentEdgeManager.currentTime();
+ while (!admin.tableExists(tableName) || !admin.isTableAvailable(tableName)) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ }
+ if (EnvironmentEdgeManager.currentTime() - startTime > TIMEOUT) {
+ throw new IOException("Failed to create backup system table after "+ TIMEOUT+"ms");
+ }
+ }
+ LOG.debug("Backup table exists and available");
+
+ }
+
+
+
+ @Override
+ public void close() {
+ // do nothing
+ }
+
+ /**
+ * Updates status (state) of a backup session in backup system table table
+ * @param info backup info
+ * @throws IOException exception
+ */
+ public void updateBackupInfo(BackupInfo info) throws IOException {
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("update backup status in backup system table for: " + info.getBackupId()
+ + " set status=" + info.getState());
+ }
+ try (Table table = connection.getTable(tableName)) {
+ Put put = createPutForBackupInfo(info);
+ table.put(put);
+ }
+ }
+
+ /**
+ * Deletes backup status from backup system table table
+ * @param backupId backup id
+ * @throws IOException exception
+ */
+
+ public void deleteBackupInfo(String backupId) throws IOException {
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("delete backup status in backup system table for " + backupId);
+ }
+ try (Table table = connection.getTable(tableName)) {
+ Delete del = createDeleteForBackupInfo(backupId);
+ table.delete(del);
+ }
+ }
+
+ /**
+ * Reads backup status object (instance of backup info) from backup system table table
+ * @param backupId backup id
+ * @return Current status of backup session or null
+ */
+
+ public BackupInfo readBackupInfo(String backupId) throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("read backup status from backup system table for: " + backupId);
+ }
+
+ try (Table table = connection.getTable(tableName)) {
+ Get get = createGetForBackupInfo(backupId);
+ Result res = table.get(get);
+ if (res.isEmpty()) {
+ return null;
+ }
+ return resultToBackupInfo(res);
+ }
+ }
+
+ /**
+ * Read the last backup start code (timestamp) of last successful backup. Will return null if
+ * there is no start code stored on hbase or the value is of length 0. These two cases indicate
+ * there is no successful backup completed so far.
+ * @param backupRoot directory path to backup destination
+ * @return the timestamp of last successful backup
+ * @throws IOException exception
+ */
+ public String readBackupStartCode(String backupRoot) throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("read backup start code from backup system table");
+ }
+ try (Table table = connection.getTable(tableName)) {
+ Get get = createGetForStartCode(backupRoot);
+ Result res = table.get(get);
+ if (res.isEmpty()) {
+ return null;
+ }
+ Cell cell = res.listCells().get(0);
+ byte[] val = CellUtil.cloneValue(cell);
+ if (val.length == 0) {
+ return null;
+ }
+ return new String(val);
+ }
+ }
+
+ /**
+ * Write the start code (timestamp) to backup system table. If passed in null, then write 0 byte.
+ * @param startCode start code
+ * @param backupRoot root directory path to backup
+ * @throws IOException exception
+ */
+ public void writeBackupStartCode(Long startCode, String backupRoot) throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("write backup start code to backup system table " + startCode);
+ }
+ try (Table table = connection.getTable(tableName)) {
+ Put put = createPutForStartCode(startCode.toString(), backupRoot);
+ table.put(put);
+ }
+ }
+
+ /**
+ * Get the Region Servers log information after the last log roll from backup system table.
+ * @param backupRoot root directory path to backup
+ * @return RS log info
+ * @throws IOException exception
+ */
+ public HashMap<String, Long> readRegionServerLastLogRollResult(String backupRoot)
+ throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("read region server last roll log result to backup system table");
+ }
+
+ Scan scan = createScanForReadRegionServerLastLogRollResult(backupRoot);
+
+ try (Table table = connection.getTable(tableName);
+ ResultScanner scanner = table.getScanner(scan)) {
+ Result res = null;
+ HashMap<String, Long> rsTimestampMap = new HashMap<String, Long>();
+ while ((res = scanner.next()) != null) {
+ res.advance();
+ Cell cell = res.current();
+ byte[] row = CellUtil.cloneRow(cell);
+ String server =
+ getServerNameForReadRegionServerLastLogRollResult(row);
+ byte[] data = CellUtil.cloneValue(cell);
+ rsTimestampMap.put(server, Bytes.toLong(data));
+ }
+ return rsTimestampMap;
+ }
+ }
+
+ /**
+ * Writes Region Server last roll log result (timestamp) to backup system table table
+ * @param server Region Server name
+ * @param ts last log timestamp
+ * @param backupRoot root directory path to backup
+ * @throws IOException exception
+ */
+ public void writeRegionServerLastLogRollResult(String server, Long ts, String backupRoot)
+ throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("write region server last roll log result to backup system table");
+ }
+ try (Table table = connection.getTable(tableName)) {
+ Put put =
+ createPutForRegionServerLastLogRollResult(server, ts, backupRoot);
+ table.put(put);
+ }
+ }
+
+ /**
+ * Get all completed backup information (in desc order by time)
+ * @param onlyCompleted true, if only successfully completed sessions
+ * @return history info of BackupCompleteData
+ * @throws IOException exception
+ */
+ public ArrayList<BackupInfo> getBackupHistory(boolean onlyCompleted) throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("get backup history from backup system table");
+ }
+ ArrayList<BackupInfo> list;
+ BackupState state = onlyCompleted ? BackupState.COMPLETE : BackupState.ANY;
+ list = getBackupInfos(state);
+ return BackupUtils.sortHistoryListDesc(list);
+ }
+
+ /**
+ * Get all backups history
+ * @return list of backup info
+ * @throws IOException
+ */
+ public List<BackupInfo> getBackupHistory() throws IOException {
+ return getBackupHistory(false);
+ }
+
+ /**
+ * Get first n backup history records
+ * @param n number of records
+ * @return list of records
+ * @throws IOException
+ */
+ public List<BackupInfo> getHistory(int n) throws IOException {
+
+ List<BackupInfo> history = getBackupHistory();
+ if (history.size() <= n) return history;
+ List<BackupInfo> list = new ArrayList<BackupInfo>();
+ for (int i = 0; i < n; i++) {
+ list.add(history.get(i));
+ }
+ return list;
+
+ }
+
+ /**
+ * Get backup history records filtered by list of filters.
+ * @param n max number of records
+ * @param filters list of filters
+ * @return backup records
+ * @throws IOException
+ */
+ public List<BackupInfo> getBackupHistory(int n, BackupInfo.Filter... filters) throws IOException {
+ if (filters.length == 0) return getHistory(n);
+
+ List<BackupInfo> history = getBackupHistory();
+ List<BackupInfo> result = new ArrayList<BackupInfo>();
+ for (BackupInfo bi : history) {
+ if (result.size() == n) break;
+ boolean passed = true;
+ for (int i = 0; i < filters.length; i++) {
+ if (!filters[i].apply(bi)) {
+ passed = false;
+ break;
+ }
+ }
+ if (passed) {
+ result.add(bi);
+ }
+ }
+ return result;
+
+ }
+
+ /**
+ * Get history for backup destination
+ * @param backupRoot backup destination path
+ * @return List of backup info
+ * @throws IOException
+ */
+ public List<BackupInfo> getBackupHistory(String backupRoot) throws IOException {
+ ArrayList<BackupInfo> history = getBackupHistory(false);
+ for (Iterator<BackupInfo> iterator = history.iterator(); iterator.hasNext();) {
+ BackupInfo info = iterator.next();
+ if (!backupRoot.equals(info.getBackupRootDir())) {
+ iterator.remove();
+ }
+ }
+ return history;
+ }
+
+ /**
+ * Get history for a table
+ * @param name table name
+ * @return history for a table
+ * @throws IOException
+ */
+ public List<BackupInfo> getBackupHistoryForTable(TableName name) throws IOException {
+ List<BackupInfo> history = getBackupHistory();
+ List<BackupInfo> tableHistory = new ArrayList<BackupInfo>();
+ for (BackupInfo info : history) {
+ List<TableName> tables = info.getTableNames();
+ if (tables.contains(name)) {
+ tableHistory.add(info);
+ }
+ }
+ return tableHistory;
+ }
+
+ public Map<TableName, ArrayList<BackupInfo>> getBackupHistoryForTableSet(Set<TableName> set,
+ String backupRoot) throws IOException {
+ List<BackupInfo> history = getBackupHistory(backupRoot);
+ Map<TableName, ArrayList<BackupInfo>> tableHistoryMap =
+ new HashMap<TableName, ArrayList<BackupInfo>>();
+ for (Iterator<BackupInfo> iterator = history.iterator(); iterator.hasNext();) {
+ BackupInfo info = iterator.next();
+ if (!backupRoot.equals(info.getBackupRootDir())) {
+ continue;
+ }
+ List<TableName> tables = info.getTableNames();
+ for (TableName tableName : tables) {
+ if (set.contains(tableName)) {
+ ArrayList<BackupInfo> list = tableHistoryMap.get(tableName);
+ if (list == null) {
+ list = new ArrayList<BackupInfo>();
+ tableHistoryMap.put(tableName, list);
+ }
+ list.add(info);
+ }
+ }
+ }
+ return tableHistoryMap;
+ }
+
+ /**
+ * Get all backup sessions with a given state (in descending order by time)
+ * @param state backup session state
+ * @return history info of backup info objects
+ * @throws IOException exception
+ */
+ public ArrayList<BackupInfo> getBackupInfos(BackupState state) throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("get backup infos from backup system table");
+ }
+
+ Scan scan = createScanForBackupHistory();
+ ArrayList<BackupInfo> list = new ArrayList<BackupInfo>();
+
+ try (Table table = connection.getTable(tableName);
+ ResultScanner scanner = table.getScanner(scan)) {
+ Result res = null;
+ while ((res = scanner.next()) != null) {
+ res.advance();
+ BackupInfo context = cellToBackupInfo(res.current());
+ if (state != BackupState.ANY && context.getState() != state) {
+ continue;
+ }
+ list.add(context);
+ }
+ return list;
+ }
+ }
+
+ /**
+ * Write the current timestamps for each regionserver to backup system table after a successful
+ * full or incremental backup. The saved timestamp is of the last log file that was backed up
+ * already.
+ * @param tables tables
+ * @param newTimestamps timestamps
+ * @param backupRoot root directory path to backup
+ * @throws IOException exception
+ */
+ public void writeRegionServerLogTimestamp(Set<TableName> tables,
+ HashMap<String, Long> newTimestamps, String backupRoot) throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("write RS log time stamps to backup system table for tables ["
+ + StringUtils.join(tables, ",") + "]");
+ }
+ List<Put> puts = new ArrayList<Put>();
+ for (TableName table : tables) {
+ byte[] smapData = toTableServerTimestampProto(table, newTimestamps).toByteArray();
+ Put put =
+ createPutForWriteRegionServerLogTimestamp(table, smapData,
+ backupRoot);
+ puts.add(put);
+ }
+ try (Table table = connection.getTable(tableName)) {
+ table.put(puts);
+ }
+ }
+
+ /**
+ * Read the timestamp for each region server log after the last successful backup. Each table has
+ * its own set of the timestamps. The info is stored for each table as a concatenated string of
+ * rs->timestapmp
+ * @param backupRoot root directory path to backup
+ * @return the timestamp for each region server. key: tableName value:
+ * RegionServer,PreviousTimeStamp
+ * @throws IOException exception
+ */
+ public HashMap<TableName, HashMap<String, Long>> readLogTimestampMap(String backupRoot)
+ throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("read RS log ts from backup system table for root=" + backupRoot);
+ }
+
+ HashMap<TableName, HashMap<String, Long>> tableTimestampMap =
+ new HashMap<TableName, HashMap<String, Long>>();
+
+ Scan scan = createScanForReadLogTimestampMap(backupRoot);
+ try (Table table = connection.getTable(tableName);
+ ResultScanner scanner = table.getScanner(scan)) {
+ Result res = null;
+ while ((res = scanner.next()) != null) {
+ res.advance();
+ Cell cell = res.current();
+ byte[] row = CellUtil.cloneRow(cell);
+ String tabName = getTableNameForReadLogTimestampMap(row);
+ TableName tn = TableName.valueOf(tabName);
+ byte[] data = CellUtil.cloneValue(cell);
+ if (data == null) {
+ throw new IOException("Data of last backup data from backup system table "
+ + "is empty. Create a backup first.");
+ }
+ if (data != null && data.length > 0) {
+ HashMap<String, Long> lastBackup =
+ fromTableServerTimestampProto(BackupProtos.TableServerTimestamp.parseFrom(data));
+ tableTimestampMap.put(tn, lastBackup);
+ }
+ }
+ return tableTimestampMap;
+ }
+ }
+
+ private BackupProtos.TableServerTimestamp toTableServerTimestampProto(TableName table,
+ Map<String, Long> map) {
+ BackupProtos.TableServerTimestamp.Builder tstBuilder =
+ BackupProtos.TableServerTimestamp.newBuilder();
+ tstBuilder.setTableName(org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil
+ .toProtoTableName(table));
+
+ for (Entry<String, Long> entry : map.entrySet()) {
+ BackupProtos.ServerTimestamp.Builder builder = BackupProtos.ServerTimestamp.newBuilder();
+ HBaseProtos.ServerName.Builder snBuilder = HBaseProtos.ServerName.newBuilder();
+ ServerName sn = ServerName.parseServerName(entry.getKey());
+ snBuilder.setHostName(sn.getHostname());
+ snBuilder.setPort(sn.getPort());
+ builder.setServerName(snBuilder.build());
+ builder.setTimestamp(entry.getValue());
+ tstBuilder.addServerTimestamp(builder.build());
+ }
+
+ return tstBuilder.build();
+ }
+
+ private HashMap<String, Long> fromTableServerTimestampProto(
+ BackupProtos.TableServerTimestamp proto) {
+ HashMap<String, Long> map = new HashMap<String, Long>();
+ List<BackupProtos.ServerTimestamp> list = proto.getServerTimestampList();
+ for (BackupProtos.ServerTimestamp st : list) {
+ ServerName sn =
+ org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toServerName(st.getServerName());
+ map.put(sn.getHostname() + ":" + sn.getPort(), st.getTimestamp());
+ }
+ return map;
+ }
+
+ /**
+ * Return the current tables covered by incremental backup.
+ * @param backupRoot root directory path to backup
+ * @return set of tableNames
+ * @throws IOException exception
+ */
+ public Set<TableName> getIncrementalBackupTableSet(String backupRoot) throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("get incremental backup table set from backup system table");
+ }
+ TreeSet<TableName> set = new TreeSet<>();
+
+ try (Table table = connection.getTable(tableName)) {
+ Get get = createGetForIncrBackupTableSet(backupRoot);
+ Result res = table.get(get);
+ if (res.isEmpty()) {
+ return set;
+ }
+ List<Cell> cells = res.listCells();
+ for (Cell cell : cells) {
+ // qualifier = table name - we use table names as qualifiers
+ set.add(TableName.valueOf(CellUtil.cloneQualifier(cell)));
+ }
+ return set;
+ }
+ }
+
+ /**
+ * Add tables to global incremental backup set
+ * @param tables set of tables
+ * @param backupRoot root directory path to backup
+ * @throws IOException exception
+ */
+ public void addIncrementalBackupTableSet(Set<TableName> tables, String backupRoot)
+ throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Add incremental backup table set to backup system table. ROOT=" + backupRoot
+ + " tables [" + StringUtils.join(tables, " ") + "]");
+ for (TableName table : tables) {
+ LOG.debug(table);
+ }
+ }
+ try (Table table = connection.getTable(tableName)) {
+ Put put = createPutForIncrBackupTableSet(tables, backupRoot);
+ table.put(put);
+ }
+ }
+
+ /**
+ * Deletes incremental backup set for a backup destination
+ * @param backupRoot backup root
+ */
+
+ public void deleteIncrementalBackupTableSet(String backupRoot) throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Delete incremental backup table set to backup system table. ROOT=" + backupRoot);
+ }
+ try (Table table = connection.getTable(tableName)) {
+ Delete delete = createDeleteForIncrBackupTableSet(backupRoot);
+ table.delete(delete);
+ }
+ }
+
+ /**
+ * Register WAL files as eligible for deletion
+ * @param files files
+ * @param backupId backup id
+ * @param backupRoot root directory path to backup destination
+ * @throws IOException exception
+ */
+ public void addWALFiles(List<String> files, String backupId, String backupRoot)
+ throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("add WAL files to backup system table: " + backupId + " " + backupRoot + " files ["
+ + StringUtils.join(files, ",") + "]");
+ for (String f : files) {
+ LOG.debug("add :" + f);
+ }
+ }
+ try (Table table = connection.getTable(tableName)) {
+ List<Put> puts =
+ createPutsForAddWALFiles(files, backupId, backupRoot);
+ table.put(puts);
+ }
+ }
+
+ /**
+ * Register WAL files as eligible for deletion
+ * @param backupRoot root directory path to backup
+ * @throws IOException exception
+ */
+ public Iterator<WALItem> getWALFilesIterator(String backupRoot) throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("get WAL files from backup system table");
+ }
+ final Table table = connection.getTable(tableName);
+ Scan scan = createScanForGetWALs(backupRoot);
+ final ResultScanner scanner = table.getScanner(scan);
+ final Iterator<Result> it = scanner.iterator();
+ return new Iterator<WALItem>() {
+
+ @Override
+ public boolean hasNext() {
+ boolean next = it.hasNext();
+ if (!next) {
+ // close all
+ try {
+ scanner.close();
+ table.close();
+ } catch (IOException e) {
+ LOG.error("Close WAL Iterator", e);
+ }
+ }
+ return next;
+ }
+
+ @Override
+ public WALItem next() {
+ Result next = it.next();
+ List<Cell> cells = next.listCells();
+ byte[] buf = cells.get(0).getValueArray();
+ int len = cells.get(0).getValueLength();
+ int offset = cells.get(0).getValueOffset();
+ String backupId = new String(buf, offset, len);
+ buf = cells.get(1).getValueArray();
+ len = cells.get(1).getValueLength();
+ offset = cells.get(1).getValueOffset();
+ String walFile = new String(buf, offset, len);
+ buf = cells.get(2).getValueArray();
+ len = cells.get(2).getValueLength();
+ offset = cells.get(2).getValueOffset();
+ String backupRoot = new String(buf, offset, len);
+ return new WALItem(backupId, walFile, backupRoot);
+ }
+
+ @Override
+ public void remove() {
+ // not implemented
+ throw new RuntimeException("remove is not supported");
+ }
+ };
+
+ }
+
+ /**
+ * Check if WAL file is eligible for deletion Future: to support all backup destinations
+ * @param file name of a file to check
+ * @return true, if deletable, false otherwise.
+ * @throws IOException exception
+ */
+ public boolean isWALFileDeletable(String file) throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Check if WAL file has been already backed up in backup system table " + file);
+ }
+ try (Table table = connection.getTable(tableName)) {
+ Get get = createGetForCheckWALFile(file);
+ Result res = table.get(get);
+ if (res.isEmpty()) {
+ return false;
+ }
+ return true;
+ }
+ }
+
+ /**
+ * Checks if we have at least one backup session in backup system table This API is used by
+ * BackupLogCleaner
+ * @return true, if - at least one session exists in backup system table table
+ * @throws IOException exception
+ */
+ public boolean hasBackupSessions() throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Has backup sessions from backup system table");
+ }
+ boolean result = false;
+ Scan scan = createScanForBackupHistory();
+ scan.setCaching(1);
+ try (Table table = connection.getTable(tableName);
+ ResultScanner scanner = table.getScanner(scan)) {
+ if (scanner.next() != null) {
+ result = true;
+ }
+ return result;
+ }
+ }
+
+ /**
+ * BACKUP SETS
+ */
+
+ /**
+ * Get backup set list
+ * @return backup set list
+ * @throws IOException
+ */
+ public List<String> listBackupSets() throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(" Backup set list");
+ }
+ List<String> list = new ArrayList<String>();
+ Table table = null;
+ ResultScanner scanner = null;
+ try {
+ table = connection.getTable(tableName);
+ Scan scan = createScanForBackupSetList();
+ scan.setMaxVersions(1);
+ scanner = table.getScanner(scan);
+ Result res = null;
+ while ((res = scanner.next()) != null) {
+ res.advance();
+ list.add(cellKeyToBackupSetName(res.current()));
+ }
+ return list;
+ } finally {
+ if (scanner != null) {
+ scanner.close();
+ }
+ if (table != null) {
+ table.close();
+ }
+ }
+ }
+
+ /**
+ * Get backup set description (list of tables)
+ * @param name set's name
+ * @return list of tables in a backup set
+ * @throws IOException
+ */
+ public List<TableName> describeBackupSet(String name) throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(" Backup set describe: " + name);
+ }
+ Table table = null;
+ try {
+ table = connection.getTable(tableName);
+ Get get = createGetForBackupSet(name);
+ Result res = table.get(get);
+ if (res.isEmpty()) return null;
+ res.advance();
+ String[] tables = cellValueToBackupSet(res.current());
+ return toList(tables);
+ } finally {
+ if (table != null) {
+ table.close();
+ }
+ }
+ }
+
+ private List<TableName> toList(String[] tables) {
+ List<TableName> list = new ArrayList<TableName>(tables.length);
+ for (String name : tables) {
+ list.add(TableName.valueOf(name));
+ }
+ return list;
+ }
+
+ /**
+ * Add backup set (list of tables)
+ * @param name set name
+ * @param newTables list of tables, comma-separated
+ * @throws IOException
+ */
+ public void addToBackupSet(String name, String[] newTables) throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Backup set add: " + name + " tables [" + StringUtils.join(newTables, " ") + "]");
+ }
+ Table table = null;
+ String[] union = null;
+ try {
+ table = connection.getTable(tableName);
+ Get get = createGetForBackupSet(name);
+ Result res = table.get(get);
+ if (res.isEmpty()) {
+ union = newTables;
+ } else {
+ res.advance();
+ String[] tables = cellValueToBackupSet(res.current());
+ union = merge(tables, newTables);
+ }
+ Put put = createPutForBackupSet(name, union);
+ table.put(put);
+ } finally {
+ if (table != null) {
+ table.close();
+ }
+ }
+ }
+
+ private String[] merge(String[] tables, String[] newTables) {
+ List<String> list = new ArrayList<String>();
+ // Add all from tables
+ for (String t : tables) {
+ list.add(t);
+ }
+ for (String nt : newTables) {
+ if (list.contains(nt)) continue;
+ list.add(nt);
+ }
+ String[] arr = new String[list.size()];
+ list.toArray(arr);
+ return arr;
+ }
+
+ /**
+ * Remove tables from backup set (list of tables)
+ * @param name set name
+ * @param toRemove list of tables
+ * @throws IOException
+ */
+ public void removeFromBackupSet(String name, String[] toRemove) throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(" Backup set remove from : " + name + " tables [" + StringUtils.join(toRemove, " ")
+ + "]");
+ }
+ Table table = null;
+ String[] disjoint = null;
+ String[] tables = null;
+ try {
+ table = connection.getTable(tableName);
+ Get get = createGetForBackupSet(name);
+ Result res = table.get(get);
+ if (res.isEmpty()) {
+ LOG.warn("Backup set '" + name + "' not found.");
+ return;
+ } else {
+ res.advance();
+ tables = cellValueToBackupSet(res.current());
+ disjoint = disjoin(tables, toRemove);
+ }
+ if (disjoint.length > 0 && disjoint.length != tables.length) {
+ Put put = createPutForBackupSet(name, disjoint);
+ table.put(put);
+ } else if(disjoint.length == tables.length) {
+ LOG.warn("Backup set '" + name + "' does not contain tables ["
+ + StringUtils.join(toRemove, " ") + "]");
+ } else { // disjoint.length == 0 and tables.length >0
+ // Delete backup set
+ LOG.info("Backup set '"+name+"' is empty. Deleting.");
+ deleteBackupSet(name);
+ }
+ } finally {
+ if (table != null) {
+ table.close();
+ }
+ }
+ }
+
+ private String[] disjoin(String[] tables, String[] toRemove) {
+ List<String> list = new ArrayList<String>();
+ // Add all from tables
+ for (String t : tables) {
+ list.add(t);
+ }
+ for (String nt : toRemove) {
+ if (list.contains(nt)) {
+ list.remove(nt);
+ }
+ }
+ String[] arr = new String[list.size()];
+ list.toArray(arr);
+ return arr;
+ }
+
+ /**
+ * Delete backup set
+ * @param name set's name
+ * @throws IOException
+ */
+ public void deleteBackupSet(String name) throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(" Backup set delete: " + name);
+ }
+ Table table = null;
+ try {
+ table = connection.getTable(tableName);
+ Delete del = createDeleteForBackupSet(name);
+ table.delete(del);
+ } finally {
+ if (table != null) {
+ table.close();
+ }
+ }
+ }
+
+ /**
+ * Get backup system table descriptor
+ * @return table's descriptor
+ */
+ public static HTableDescriptor getSystemTableDescriptor(Configuration conf) {
+
+ HTableDescriptor tableDesc = new HTableDescriptor(getTableName(conf));
+ HColumnDescriptor colSessionsDesc = new HColumnDescriptor(SESSIONS_FAMILY);
+ colSessionsDesc.setMaxVersions(1);
+ // Time to keep backup sessions (secs)
+ Configuration config = HBaseConfiguration.create();
+ int ttl =
+ config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY,
+ BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT);
+ colSessionsDesc.setTimeToLive(ttl);
+ tableDesc.addFamily(colSessionsDesc);
+ HColumnDescriptor colMetaDesc = new HColumnDescriptor(META_FAMILY);
+ tableDesc.addFamily(colMetaDesc);
+ return tableDesc;
+ }
+
+ public static TableName getTableName(Configuration conf) {
+ String name =
+ conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY,
+ BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT);
+ return TableName.valueOf(name);
+ }
+
+ public static String getTableNameAsString(Configuration conf) {
+ return getTableName(conf).getNameAsString();
+ }
+
+
+
+
+
+ /**
+ * Creates Put operation for a given backup info object
+ * @param context backup info
+ * @return put operation
+ * @throws IOException exception
+ */
+ private Put createPutForBackupInfo(BackupInfo context) throws IOException {
+ Put put = new Put(rowkey(BACKUP_INFO_PREFIX, context.getBackupId()));
+ put.addColumn(BackupSystemTable.SESSIONS_FAMILY, Bytes.toBytes("context"),
+ context.toByteArray());
+ return put;
+ }
+
+ /**
+ * Creates Get operation for a given backup id
+ * @param backupId backup's ID
+ * @return get operation
+ * @throws IOException exception
+ */
+ private Get createGetForBackupInfo(String backupId) throws IOException {
+ Get get = new Get(rowkey(BACKUP_INFO_PREFIX, backupId));
+ get.addFamily(BackupSystemTable.SESSIONS_FAMILY);
+ get.setMaxVersions(1);
+ return get;
+ }
+
+ /**
+ * Creates Delete operation for a given backup id
+ * @param backupId backup's ID
+ * @return delete operation
+ * @throws IOException exception
+ */
+ private Delete createDeleteForBackupInfo(String backupId) {
+ Delete del = new Delete(rowkey(BACKUP_INFO_PREFIX, backupId));
+ del.addFamily(BackupSystemTable.SESSIONS_FAMILY);
+ return del;
+ }
+
+ /**
+ * Converts Result to BackupInfo
+ * @param res HBase result
+ * @return backup info instance
+ * @throws IOException exception
+ */
+ private BackupInfo resultToBackupInfo(Result res) throws IOException {
+ res.advance();
+ Cell cell = res.current();
+ return cellToBackupInfo(cell);
+ }
+
+ /**
+ * Creates Get operation to retrieve start code from backup system table
+ * @return get operation
+ * @throws IOException exception
+ */
+ private Get createGetForStartCode(String rootPath) throws IOException {
+ Get get = new Get(rowkey(START_CODE_ROW, rootPath));
+ get.addFamily(BackupSystemTable.META_FAMILY);
+ get.setMaxVersions(1);
+ return get;
+ }
+
+ /**
+ * Creates Put operation to store start code to backup system table
+ * @return put operation
+ * @throws IOException exception
+ */
+ private Put createPutForStartCode(String startCode, String rootPath) {
+ Put put = new Put(rowkey(START_CODE_ROW, rootPath));
+ put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("startcode"),
+ Bytes.toBytes(startCode));
+ return put;
+ }
+
+ /**
+ * Creates Get to retrieve incremental backup table set from backup system table
+ * @return get operation
+ * @throws IOException exception
+ */
+ private Get createGetForIncrBackupTableSet(String backupRoot) throws IOException {
+ Get get = new Get(rowkey(INCR_BACKUP_SET, backupRoot));
+ get.addFamily(BackupSystemTable.META_FAMILY);
+ get.setMaxVersions(1);
+ return get;
+ }
+
+ /**
+ * Creates Put to store incremental backup table set
+ * @param tables tables
+ * @return put operation
+ */
+ private Put createPutForIncrBackupTableSet(Set<TableName> tables, String backupRoot) {
+ Put put = new Put(rowkey(INCR_BACKUP_SET, backupRoot));
+ for (TableName table : tables) {
+ put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes(table.getNameAsString()),
+ EMPTY_VALUE);
+ }
+ return put;
+ }
+
+ /**
+ * Creates Delete for incremental backup table set
+ * @param backupRoot backup root
+ * @return delete operation
+ */
+ private Delete createDeleteForIncrBackupTableSet(String backupRoot) {
+ Delete delete = new Delete(rowkey(INCR_BACKUP_SET, backupRoot));
+ delete.addFamily(BackupSystemTable.META_FAMILY);
+ return delete;
+ }
+
+ /**
+ * Creates Scan operation to load backup history
+ * @return scan operation
+ */
+ private Scan createScanForBackupHistory() {
+ Scan scan = new Scan();
+ byte[] startRow = Bytes.toBytes(BACKUP_INFO_PREFIX);
+ byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
+ stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
+ scan.setStartRow(startRow);
+ scan.setStopRow(stopRow);
+ scan.addFamily(BackupSystemTable.SESSIONS_FAMILY);
+ scan.setMaxVersions(1);
+ return scan;
+ }
+
+ /**
+ * Converts cell to backup info instance.
+ * @param current current cell
+ * @return backup backup info instance
+ * @throws IOException exception
+ */
+ private BackupInfo cellToBackupInfo(Cell current) throws IOException {
+ byte[] data = CellUtil.cloneValue(current);
+ return BackupInfo.fromByteArray(data);
+ }
+
+ /**
+ * Creates Put to write RS last roll log timestamp map
+ * @param table table
+ * @param smap map, containing RS:ts
+ * @return put operation
+ */
+ private Put createPutForWriteRegionServerLogTimestamp(TableName table, byte[] smap,
+ String backupRoot) {
+ Put put = new Put(rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot, NULL, table.getNameAsString()));
+ put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("log-roll-map"), smap);
+ return put;
+ }
+
+ /**
+ * Creates Scan to load table-> { RS -> ts} map of maps
+ * @return scan operation
+ */
+ private Scan createScanForReadLogTimestampMap(String backupRoot) {
+ Scan scan = new Scan();
+ byte[] startRow = rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot);
+ byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
+ stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
+ scan.setStartRow(startRow);
+ scan.setStopRow(stopRow);
+ scan.addFamily(BackupSystemTable.META_FAMILY);
+
+ return scan;
+ }
+
+ /**
+ * Get table name from rowkey
+ * @param cloneRow rowkey
+ * @return table name
+ */
+ private String getTableNameForReadLogTimestampMap(byte[] cloneRow) {
+ String s = Bytes.toString(cloneRow);
+ int index = s.lastIndexOf(NULL);
+ return s.substring(index + 1);
+ }
+
+ /**
+ * Creates Put to store RS last log result
+ * @param server server name
+ * @param timestamp log roll result (timestamp)
+ * @return put operation
+ */
+ private Put createPutForRegionServerLastLogRollResult(String server, Long timestamp,
+ String backupRoot) {
+ Put put = new Put(rowkey(RS_LOG_TS_PREFIX, backupRoot, NULL, server));
+ put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("rs-log-ts"),
+ Bytes.toBytes(timestamp));
+ return put;
+ }
+
+ /**
+ * Creates Scan operation to load last RS log roll results
+ * @return scan operation
+ */
+ private Scan createScanForReadRegionServerLastLogRollResult(String backupRoot) {
+ Scan scan = new Scan();
+ byte[] startRow = rowkey(RS_LOG_TS_PREFIX, backupRoot);
+ byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
+ stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
+ scan.setStartRow(startRow);
+ scan.setStopRow(stopRow);
+ scan.addFamily(BackupSystemTable.META_FAMILY);
+ scan.setMaxVersions(1);
+
+ return scan;
+ }
+
+ /**
+ * Get server's name from rowkey
+ * @param row rowkey
+ * @return server's name
+ */
+ private String getServerNameForReadRegionServerLastLogRollResult(byte[] row) {
+ String s = Bytes.toString(row);
+ int index = s.lastIndexOf(NULL);
+ return s.substring(index + 1);
+ }
+
+ /**
+ * Creates put list for list of WAL files
+ * @param files list of WAL file paths
+ * @param backupId backup id
+ * @return put list
+ * @throws IOException exception
+ */
+ private List<Put> createPutsForAddWALFiles(List<String> files, String backupId,
+ String backupRoot) throws IOException {
+
+ List<Put> puts = new ArrayList<Put>();
+ for (String file : files) {
+ Put put = new Put(rowkey(WALS_PREFIX, BackupUtils.getUniqueWALFileNamePart(file)));
+ put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("backupId"),
+ Bytes.toBytes(backupId));
+ put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("file"), Bytes.toBytes(file));
+ put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("root"), Bytes.toBytes(backupRoot));
+ puts.add(put);
+ }
+ return puts;
+ }
+
+ /**
+ * Creates Scan operation to load WALs
+ * @param backupRoot path to backup destination
+ * @return scan operation
+ */
+ private Scan createScanForGetWALs(String backupRoot) {
+ // TODO: support for backupRoot
+ Scan scan = new Scan();
+ byte[] startRow = Bytes.toBytes(WALS_PREFIX);
+ byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
+ stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
+ scan.setStartRow(startRow);
+ scan.setStopRow(stopRow);
+ scan.addFamily(BackupSystemTable.META_FAMILY);
+ return scan;
+ }
+
+ /**
+ * Creates Get operation for a given wal file name TODO: support for backup destination
+ * @param file file
+ * @return get operation
+ * @throws IOException exception
+ */
+ private Get createGetForCheckWALFile(String file) throws IOException {
+ Get get = new Get(rowkey(WALS_PREFIX, BackupUtils.getUniqueWALFileNamePart(file)));
+ // add backup root column
+ get.addFamily(BackupSystemTable.META_FAMILY);
+ return get;
+ }
+
+ /**
+ * Creates Scan operation to load backup set list
+ * @return scan operation
+ */
+ private Scan createScanForBackupSetList() {
+ Scan scan = new Scan();
+ byte[] startRow = Bytes.toBytes(SET_KEY_PREFIX);
+ byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
+ stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
+ scan.setStartRow(startRow);
+ scan.setStopRow(stopRow);
+ scan.addFamily(BackupSystemTable.META_FAMILY);
+ return scan;
+ }
+
+ /**
+ * Creates Get operation to load backup set content
+ * @return get operation
+ */
+ private Get createGetForBackupSet(String name) {
+ Get get = new Get(rowkey(SET_KEY_PREFIX, name));
+ get.addFamily(BackupSystemTable.META_FAMILY);
+ return get;
+ }
+
+ /**
+ * Creates Delete operation to delete backup set content
+ * @param name backup set's name
+ * @return delete operation
+ */
+ private Delete createDeleteForBackupSet(String name) {
+ Delete del = new Delete(rowkey(SET_KEY_PREFIX, name));
+ del.addFamily(BackupSystemTable.META_FAMILY);
+ return del;
+ }
+
+ /**
+ * Creates Put operation to update backup set content
+ * @param name backup set's name
+ * @param tables list of tables
+ * @return put operation
+ */
+ private Put createPutForBackupSet(String name, String[] tables) {
+ Put put = new Put(rowkey(SET_KEY_PREFIX, name));
+ byte[] value = convertToByteArray(tables);
+ put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("tables"), value);
+ return put;
+ }
+
+ private byte[] convertToByteArray(String[] tables) {
+ return StringUtils.join(tables, ",").getBytes();
+ }
+
+ /**
+ * Converts cell to backup set list.
+ * @param current current cell
+ * @return backup set as array of table names
+ * @throws IOException
+ */
+ private String[] cellValueToBackupSet(Cell current) throws IOException {
+ byte[] data = CellUtil.cloneValue(current);
+ if (data != null && data.length > 0) {
+ return Bytes.toString(data).split(",");
+ } else {
+ return new String[0];
+ }
+ }
+
+ /**
+ * Converts cell key to backup set name.
+ * @param current current cell
+ * @return backup set name
+ * @throws IOException
+ */
+ private String cellKeyToBackupSetName(Cell current) throws IOException {
+ byte[] data = CellUtil.cloneRow(current);
+ return Bytes.toString(data).substring(SET_KEY_PREFIX.length());
+ }
+
+ private byte[] rowkey(String s, String... other) {
+ StringBuilder sb = new StringBuilder(s);
+ for (String ss : other) {
+ sb.append(ss);
+ }
+ return sb.toString().getBytes();
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3aaea8e0/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
new file mode 100644
index 0000000..77d1184
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.impl;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupCopyJob;
+import org.apache.hadoop.hbase.backup.BackupInfo;
+import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase;
+import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
+import org.apache.hadoop.hbase.backup.BackupRequest;
+import org.apache.hadoop.hbase.backup.BackupRestoreFactory;
+import org.apache.hadoop.hbase.backup.BackupType;
+import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+/**
+ * Full table backup implementation
+ *
+ */
+@InterfaceAudience.Private
+public class FullTableBackupClient extends TableBackupClient {
+ private static final Log LOG = LogFactory.getLog(FullTableBackupClient.class);
+
+ public FullTableBackupClient(final Connection conn, final String backupId, BackupRequest request)
+ throws IOException {
+ super(conn, backupId, request);
+ }
+
+ /**
+ * Do snapshot copy.
+ * @param backupInfo backup info
+ * @throws Exception exception
+ */
+ private void snapshotCopy(BackupInfo backupInfo) throws Exception {
+ LOG.info("Snapshot copy is starting.");
+
+ // set overall backup phase: snapshot_copy
+ backupInfo.setPhase(BackupPhase.SNAPSHOTCOPY);
+
+ // call ExportSnapshot to copy files based on hbase snapshot for backup
+ // ExportSnapshot only support single snapshot export, need loop for multiple tables case
+ BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf);
+
+ // number of snapshots matches number of tables
+ float numOfSnapshots = backupInfo.getSnapshotNames().size();
+
+ LOG.debug("There are " + (int) numOfSnapshots + " snapshots to be copied.");
+
+ for (TableName table : backupInfo.getTables()) {
+ // Currently we simply set the sub copy tasks by counting the table snapshot number, we can
+ // calculate the real files' size for the percentage in the future.
+ // backupCopier.setSubTaskPercntgInWholeTask(1f / numOfSnapshots);
+ int res = 0;
+ String[] args = new String[4];
+ args[0] = "-snapshot";
+ args[1] = backupInfo.getSnapshotName(table);
+ args[2] = "-copy-to";
+ args[3] = backupInfo.getTableBackupDir(table);
+
+ LOG.debug("Copy snapshot " + args[1] + " to " + args[3]);
+ res = copyService.copy(backupInfo, backupManager, conf, BackupType.FULL, args);
+ // if one snapshot export failed, do not continue for remained snapshots
+ if (res != 0) {
+ LOG.error("Exporting Snapshot " + args[1] + " failed with return code: " + res + ".");
+
+ throw new IOException("Failed of exporting snapshot " + args[1] + " to " + args[3]
+ + " with reason code " + res);
+ }
+ LOG.info("Snapshot copy " + args[1] + " finished.");
+ }
+ }
+
+ /**
+ * Backup request execution
+ * @throws IOException
+ */
+ @Override
+ public void execute() throws IOException {
+
+ try (Admin admin = conn.getAdmin();) {
+
+ // Begin BACKUP
+ beginBackup(backupManager, backupInfo);
+ String savedStartCode = null;
+ boolean firstBackup = false;
+ // do snapshot for full table backup
+
+ savedStartCode = backupManager.readBackupStartCode();
+ firstBackup = savedStartCode == null || Long.parseLong(savedStartCode) == 0L;
+ if (firstBackup) {
+ // This is our first backup. Let's put some marker to system table so that we can hold the logs
+ // while we do the backup.
+ backupManager.writeBackupStartCode(0L);
+ }
+ // We roll log here before we do the snapshot. It is possible there is duplicate data
+ // in the log that is already in the snapshot. But if we do it after the snapshot, we
+ // could have data loss.
+ // A better approach is to do the roll log on each RS in the same global procedure as
+ // the snapshot.
+ LOG.info("Execute roll log procedure for full backup ...");
+
+ Map<String, String> props = new HashMap<String, String>();
+ props.put("backupRoot", backupInfo.getBackupRootDir());
+ admin.execProcedure(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE,
+ LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props);
+
+ newTimestamps = backupManager.readRegionServerLastLogRollResult();
+ if (firstBackup) {
+ // Updates registered log files
+ // We record ALL old WAL files as registered, because
+ // this is a first full backup in the system and these
+ // files are not needed for next incremental backup
+ List<String> logFiles = BackupUtils.getWALFilesOlderThan(conf, newTimestamps);
+ backupManager.recordWALFiles(logFiles);
+ }
+
+ // SNAPSHOT_TABLES:
+ backupInfo.setPhase(BackupPhase.SNAPSHOT);
+ for (TableName tableName : tableList) {
+ String snapshotName =
+ "snapshot_" + Long.toString(EnvironmentEdgeManager.currentTime()) + "_"
+ + tableName.getNamespaceAsString() + "_" + tableName.getQualifierAsString();
+
+ admin.snapshot(snapshotName, tableName);
+
+ backupInfo.setSnapshotName(tableName, snapshotName);
+ }
+
+ // SNAPSHOT_COPY:
+ // do snapshot copy
+ LOG.debug("snapshot copy for " + backupId);
+ snapshotCopy(backupInfo);
+ // Updates incremental backup table set
+ backupManager.addIncrementalBackupTableSet(backupInfo.getTables());
+
+ // BACKUP_COMPLETE:
+ // set overall backup status: complete. Here we make sure to complete the backup.
+ // After this checkpoint, even if entering cancel process, will let the backup finished
+ backupInfo.setState(BackupState.COMPLETE);
+ // The table list in backupInfo is good for both full backup and incremental backup.
+ // For incremental backup, it contains the incremental backup table set.
+ backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps);
+
+ HashMap<TableName, HashMap<String, Long>> newTableSetTimestampMap =
+ backupManager.readLogTimestampMap();
+
+ Long newStartCode =
+ BackupUtils.getMinValue(BackupUtils
+ .getRSLogTimestampMins(newTableSetTimestampMap));
+ backupManager.writeBackupStartCode(newStartCode);
+
+ // backup complete
+ completeBackup(conn, backupInfo, backupManager, BackupType.FULL, conf);
+ } catch (Exception e) {
+ failBackup(conn, backupInfo, backupManager, e, "Unexpected BackupException : ",
+ BackupType.FULL, conf);
+ throw new IOException(e);
+ }
+
+ }
+
+}