You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by el...@apache.org on 2017/08/23 16:47:03 UTC
[06/36] hbase git commit: HBASE-17614: Move Backup/Restore into
separate module (Vladimir Rodionov)
http://git-wip-us.apache.org/repos/asf/hbase/blob/2dda3712/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
deleted file mode 100644
index 4dab046..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
+++ /dev/null
@@ -1,2051 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.backup.impl;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.NamespaceDescriptor;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.backup.BackupInfo;
-import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
-import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
-import org.apache.hadoop.hbase.backup.BackupType;
-import org.apache.hadoop.hbase.backup.util.BackupUtils;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.SnapshotDescription;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Pair;
-
-/**
- * This class provides API to access backup system table<br>
- *
- * Backup system table schema:<br>
- * <p><ul>
- * <li>1. Backup sessions rowkey= "session:"+backupId; value =serialized BackupInfo</li>
- * <li>2. Backup start code rowkey = "startcode:"+backupRoot; value = startcode</li>
- * <li>3. Incremental backup set rowkey="incrbackupset:"+backupRoot; value=[list of tables]</li>
- * <li>4. Table-RS-timestamp map rowkey="trslm:"+backupRoot+table_name;
- * value = map[RS-> last WAL timestamp]</li>
- * <li>5. RS - WAL ts map rowkey="rslogts:"+backupRoot +server; value = last WAL timestamp</li>
- * <li>6. WALs recorded rowkey="wals:"+WAL unique file name;
- * value = backupId and full WAL file name</li>
- * </ul></p>
- */
-
-@InterfaceAudience.Private
-public final class BackupSystemTable implements Closeable {
- private static final Log LOG = LogFactory.getLog(BackupSystemTable.class);
-
- static class WALItem {
- String backupId;
- String walFile;
- String backupRoot;
-
- WALItem(String backupId, String walFile, String backupRoot) {
- this.backupId = backupId;
- this.walFile = walFile;
- this.backupRoot = backupRoot;
- }
-
- public String getBackupId() {
- return backupId;
- }
-
- public String getWalFile() {
- return walFile;
- }
-
- public String getBackupRoot() {
- return backupRoot;
- }
-
- @Override
- public String toString() {
- return Path.SEPARATOR + backupRoot + Path.SEPARATOR + backupId + Path.SEPARATOR + walFile;
- }
-
- }
-
- private TableName tableName;
- /**
- * Stores backup sessions (contexts)
- */
- final static byte[] SESSIONS_FAMILY = "session".getBytes();
- /**
- * Stores other meta
- */
- final static byte[] META_FAMILY = "meta".getBytes();
- final static byte[] BULK_LOAD_FAMILY = "bulk".getBytes();
- /**
- * Connection to HBase cluster, shared among all instances
- */
- private final Connection connection;
-
- private final static String BACKUP_INFO_PREFIX = "session:";
- private final static String START_CODE_ROW = "startcode:";
- private final static byte[] ACTIVE_SESSION_ROW = "activesession:".getBytes();
- private final static byte[] ACTIVE_SESSION_COL = "c".getBytes();
-
- private final static byte[] ACTIVE_SESSION_YES = "yes".getBytes();
- private final static byte[] ACTIVE_SESSION_NO = "no".getBytes();
-
- private final static String INCR_BACKUP_SET = "incrbackupset:";
- private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:";
- private final static String RS_LOG_TS_PREFIX = "rslogts:";
-
- private final static String BULK_LOAD_PREFIX = "bulk:";
- private final static byte[] BULK_LOAD_PREFIX_BYTES = BULK_LOAD_PREFIX.getBytes();
- private final static byte[] DELETE_OP_ROW = "delete_op_row".getBytes();
- private final static byte[] MERGE_OP_ROW = "merge_op_row".getBytes();
-
- final static byte[] TBL_COL = Bytes.toBytes("tbl");
- final static byte[] FAM_COL = Bytes.toBytes("fam");
- final static byte[] PATH_COL = Bytes.toBytes("path");
- final static byte[] STATE_COL = Bytes.toBytes("state");
- // the two states a bulk loaded file can be
- final static byte[] BL_PREPARE = Bytes.toBytes("R");
- final static byte[] BL_COMMIT = Bytes.toBytes("D");
-
- private final static String WALS_PREFIX = "wals:";
- private final static String SET_KEY_PREFIX = "backupset:";
-
- // separator between BULK_LOAD_PREFIX and ordinals
- protected final static String BLK_LD_DELIM = ":";
- private final static byte[] EMPTY_VALUE = new byte[] {};
-
- // Safe delimiter in a string
- private final static String NULL = "\u0000";
-
- public BackupSystemTable(Connection conn) throws IOException {
- this.connection = conn;
- tableName = BackupSystemTable.getTableName(conn.getConfiguration());
- checkSystemTable();
- }
-
- private void checkSystemTable() throws IOException {
- try (Admin admin = connection.getAdmin();) {
-
- verifyNamespaceExists(admin);
-
- if (!admin.tableExists(tableName)) {
- HTableDescriptor backupHTD =
- BackupSystemTable.getSystemTableDescriptor(connection.getConfiguration());
- admin.createTable(backupHTD);
- }
- waitForSystemTable(admin);
- }
- }
-
- private void verifyNamespaceExists(Admin admin) throws IOException {
- String namespaceName = tableName.getNamespaceAsString();
- NamespaceDescriptor ns = NamespaceDescriptor.create(namespaceName).build();
- NamespaceDescriptor[] list = admin.listNamespaceDescriptors();
- boolean exists = false;
- for (NamespaceDescriptor nsd : list) {
- if (nsd.getName().equals(ns.getName())) {
- exists = true;
- break;
- }
- }
- if (!exists) {
- admin.createNamespace(ns);
- }
- }
-
- private void waitForSystemTable(Admin admin) throws IOException {
- long TIMEOUT = 60000;
- long startTime = EnvironmentEdgeManager.currentTime();
- while (!admin.tableExists(tableName) || !admin.isTableAvailable(tableName)) {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- }
- if (EnvironmentEdgeManager.currentTime() - startTime > TIMEOUT) {
- throw new IOException("Failed to create backup system table after " + TIMEOUT + "ms");
- }
- }
- LOG.debug("Backup table exists and available");
-
- }
-
- @Override
- public void close() {
- // do nothing
- }
-
- /**
- * Updates status (state) of a backup session in backup system table table
- * @param info backup info
- * @throws IOException exception
- */
- public void updateBackupInfo(BackupInfo info) throws IOException {
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("update backup status in backup system table for: " + info.getBackupId()
- + " set status=" + info.getState());
- }
- try (Table table = connection.getTable(tableName)) {
- Put put = createPutForBackupInfo(info);
- table.put(put);
- }
- }
-
- /*
- * @param backupId the backup Id
- * @return Map of rows to path of bulk loaded hfile
- */
- Map<byte[], String> readBulkLoadedFiles(String backupId) throws IOException {
- Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId);
- try (Table table = connection.getTable(tableName);
- ResultScanner scanner = table.getScanner(scan)) {
- Result res = null;
- Map<byte[], String> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
- while ((res = scanner.next()) != null) {
- res.advance();
- byte[] row = CellUtil.cloneRow(res.listCells().get(0));
- for (Cell cell : res.listCells()) {
- if (CellComparator.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0,
- BackupSystemTable.PATH_COL.length) == 0) {
- map.put(row, Bytes.toString(CellUtil.cloneValue(cell)));
- }
- }
- }
- return map;
- }
- }
-
- /*
- * Used during restore
- * @param backupId the backup Id
- * @param sTableList List of tables
- * @return array of Map of family to List of Paths
- */
- public Map<byte[], List<Path>>[] readBulkLoadedFiles(String backupId, List<TableName> sTableList)
- throws IOException {
- Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId);
- Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList == null ? 1 : sTableList.size()];
- try (Table table = connection.getTable(tableName);
- ResultScanner scanner = table.getScanner(scan)) {
- Result res = null;
- while ((res = scanner.next()) != null) {
- res.advance();
- TableName tbl = null;
- byte[] fam = null;
- String path = null;
- for (Cell cell : res.listCells()) {
- if (CellComparator.compareQualifiers(cell, BackupSystemTable.TBL_COL, 0,
- BackupSystemTable.TBL_COL.length) == 0) {
- tbl = TableName.valueOf(CellUtil.cloneValue(cell));
- } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0,
- BackupSystemTable.FAM_COL.length) == 0) {
- fam = CellUtil.cloneValue(cell);
- } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0,
- BackupSystemTable.PATH_COL.length) == 0) {
- path = Bytes.toString(CellUtil.cloneValue(cell));
- }
- }
- int srcIdx = IncrementalTableBackupClient.getIndex(tbl, sTableList);
- if (srcIdx == -1) {
- // the table is not among the query
- continue;
- }
- if (mapForSrc[srcIdx] == null) {
- mapForSrc[srcIdx] = new TreeMap<>(Bytes.BYTES_COMPARATOR);
- }
- List<Path> files;
- if (!mapForSrc[srcIdx].containsKey(fam)) {
- files = new ArrayList<Path>();
- mapForSrc[srcIdx].put(fam, files);
- } else {
- files = mapForSrc[srcIdx].get(fam);
- }
- files.add(new Path(path));
- if (LOG.isDebugEnabled()) {
- LOG.debug("found bulk loaded file : " + tbl + " " + Bytes.toString(fam) + " " + path);
- }
- }
- ;
- return mapForSrc;
- }
- }
-
- /*
- * @param map Map of row keys to path of bulk loaded hfile
- */
- void deleteBulkLoadedFiles(Map<byte[], String> map) throws IOException {
- try (Table table = connection.getTable(tableName)) {
- List<Delete> dels = new ArrayList<>();
- for (byte[] row : map.keySet()) {
- dels.add(new Delete(row).addFamily(BackupSystemTable.META_FAMILY));
- }
- table.delete(dels);
- }
- }
-
- /**
- * Deletes backup status from backup system table table
- * @param backupId backup id
- * @throws IOException exception
- */
-
- public void deleteBackupInfo(String backupId) throws IOException {
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("delete backup status in backup system table for " + backupId);
- }
- try (Table table = connection.getTable(tableName)) {
- Delete del = createDeleteForBackupInfo(backupId);
- table.delete(del);
- }
- }
-
- /*
- * For postBulkLoadHFile() hook.
- * @param tabName table name
- * @param region the region receiving hfile
- * @param finalPaths family and associated hfiles
- */
- public void writePathsPostBulkLoad(TableName tabName, byte[] region,
- Map<byte[], List<Path>> finalPaths) throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("write bulk load descriptor to backup " + tabName + " with " + finalPaths.size()
- + " entries");
- }
- try (Table table = connection.getTable(tableName)) {
- List<Put> puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region, finalPaths);
- table.put(puts);
- LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName);
- }
- }
-
- /*
- * For preCommitStoreFile() hook
- * @param tabName table name
- * @param region the region receiving hfile
- * @param family column family
- * @param pairs list of paths for hfiles
- */
- public void writeFilesForBulkLoadPreCommit(TableName tabName, byte[] region, final byte[] family,
- final List<Pair<Path, Path>> pairs) throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("write bulk load descriptor to backup " + tabName + " with " + pairs.size()
- + " entries");
- }
- try (Table table = connection.getTable(tableName)) {
- List<Put> puts =
- BackupSystemTable.createPutForPreparedBulkload(tabName, region, family, pairs);
- table.put(puts);
- LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName);
- }
- }
-
- /*
- * Removes rows recording bulk loaded hfiles from backup table
- * @param lst list of table names
- * @param rows the rows to be deleted
- */
- public void removeBulkLoadedRows(List<TableName> lst, List<byte[]> rows) throws IOException {
- try (Table table = connection.getTable(tableName)) {
- List<Delete> lstDels = new ArrayList<>();
- for (byte[] row : rows) {
- Delete del = new Delete(row);
- lstDels.add(del);
- LOG.debug("orig deleting the row: " + Bytes.toString(row));
- }
- table.delete(lstDels);
- LOG.debug("deleted " + rows.size() + " original bulkload rows for " + lst.size() + " tables");
- }
- }
-
- /*
- * Reads the rows from backup table recording bulk loaded hfiles
- * @param tableList list of table names
- * @return The keys of the Map are table, region and column family. Value of the map reflects
- * whether the hfile was recorded by preCommitStoreFile hook (true)
- */
- public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>>
- readBulkloadRows(List<TableName> tableList) throws IOException {
- Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = new HashMap<>();
- List<byte[]> rows = new ArrayList<>();
- for (TableName tTable : tableList) {
- Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(tTable);
- Map<String, Map<String, List<Pair<String, Boolean>>>> tblMap = map.get(tTable);
- try (Table table = connection.getTable(tableName);
- ResultScanner scanner = table.getScanner(scan)) {
- Result res = null;
- while ((res = scanner.next()) != null) {
- res.advance();
- String fam = null;
- String path = null;
- boolean raw = false;
- byte[] row = null;
- String region = null;
- for (Cell cell : res.listCells()) {
- row = CellUtil.cloneRow(cell);
- rows.add(row);
- String rowStr = Bytes.toString(row);
- region = BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr);
- if (CellComparator.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0,
- BackupSystemTable.FAM_COL.length) == 0) {
- fam = Bytes.toString(CellUtil.cloneValue(cell));
- } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0,
- BackupSystemTable.PATH_COL.length) == 0) {
- path = Bytes.toString(CellUtil.cloneValue(cell));
- } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.STATE_COL, 0,
- BackupSystemTable.STATE_COL.length) == 0) {
- byte[] state = CellUtil.cloneValue(cell);
- if (Bytes.equals(BackupSystemTable.BL_PREPARE, state)) {
- raw = true;
- } else raw = false;
- }
- }
- if (map.get(tTable) == null) {
- map.put(tTable, new HashMap<String, Map<String, List<Pair<String, Boolean>>>>());
- tblMap = map.get(tTable);
- }
- if (tblMap.get(region) == null) {
- tblMap.put(region, new HashMap<String, List<Pair<String, Boolean>>>());
- }
- Map<String, List<Pair<String, Boolean>>> famMap = tblMap.get(region);
- if (famMap.get(fam) == null) {
- famMap.put(fam, new ArrayList<Pair<String, Boolean>>());
- }
- famMap.get(fam).add(new Pair<>(path, raw));
- LOG.debug("found orig " + path + " for " + fam + " of table " + region);
- }
- }
- }
- return new Pair<>(map, rows);
- }
-
- /*
- * @param sTableList List of tables
- * @param maps array of Map of family to List of Paths
- * @param backupId the backup Id
- */
- public void writeBulkLoadedFiles(List<TableName> sTableList, Map<byte[], List<Path>>[] maps,
- String backupId) throws IOException {
- try (Table table = connection.getTable(tableName)) {
- long ts = EnvironmentEdgeManager.currentTime();
- int cnt = 0;
- List<Put> puts = new ArrayList<>();
- for (int idx = 0; idx < maps.length; idx++) {
- Map<byte[], List<Path>> map = maps[idx];
- TableName tn = sTableList.get(idx);
- if (map == null) continue;
- for (Map.Entry<byte[], List<Path>> entry : map.entrySet()) {
- byte[] fam = entry.getKey();
- List<Path> paths = entry.getValue();
- for (Path p : paths) {
- Put put =
- BackupSystemTable.createPutForBulkLoadedFile(tn, fam, p.toString(), backupId, ts,
- cnt++);
- puts.add(put);
- }
- }
- }
- if (!puts.isEmpty()) {
- table.put(puts);
- }
- }
- }
-
- /**
- * Reads backup status object (instance of backup info) from backup system table table
- * @param backupId backup id
- * @return Current status of backup session or null
- */
-
- public BackupInfo readBackupInfo(String backupId) throws IOException {
- if (LOG.isTraceEnabled()) {
- LOG.trace("read backup status from backup system table for: " + backupId);
- }
-
- try (Table table = connection.getTable(tableName)) {
- Get get = createGetForBackupInfo(backupId);
- Result res = table.get(get);
- if (res.isEmpty()) {
- return null;
- }
- return resultToBackupInfo(res);
- }
- }
-
- /**
- * Read the last backup start code (timestamp) of last successful backup. Will return null if
- * there is no start code stored on hbase or the value is of length 0. These two cases indicate
- * there is no successful backup completed so far.
- * @param backupRoot directory path to backup destination
- * @return the timestamp of last successful backup
- * @throws IOException exception
- */
- public String readBackupStartCode(String backupRoot) throws IOException {
- if (LOG.isTraceEnabled()) {
- LOG.trace("read backup start code from backup system table");
- }
- try (Table table = connection.getTable(tableName)) {
- Get get = createGetForStartCode(backupRoot);
- Result res = table.get(get);
- if (res.isEmpty()) {
- return null;
- }
- Cell cell = res.listCells().get(0);
- byte[] val = CellUtil.cloneValue(cell);
- if (val.length == 0) {
- return null;
- }
- return new String(val);
- }
- }
-
- /**
- * Write the start code (timestamp) to backup system table. If passed in null, then write 0 byte.
- * @param startCode start code
- * @param backupRoot root directory path to backup
- * @throws IOException exception
- */
- public void writeBackupStartCode(Long startCode, String backupRoot) throws IOException {
- if (LOG.isTraceEnabled()) {
- LOG.trace("write backup start code to backup system table " + startCode);
- }
- try (Table table = connection.getTable(tableName)) {
- Put put = createPutForStartCode(startCode.toString(), backupRoot);
- table.put(put);
- }
- }
-
- /**
- * Exclusive operations are:
- * create, delete, merge
- * @throws IOException
- */
- public void startBackupExclusiveOperation() throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Start new backup exclusive operation");
- }
- try (Table table = connection.getTable(tableName)) {
- Put put = createPutForStartBackupSession();
- // First try to put if row does not exist
- if (!table.checkAndPut(ACTIVE_SESSION_ROW, SESSIONS_FAMILY, ACTIVE_SESSION_COL, null, put)) {
- // Row exists, try to put if value == ACTIVE_SESSION_NO
- if (!table.checkAndPut(ACTIVE_SESSION_ROW, SESSIONS_FAMILY, ACTIVE_SESSION_COL,
- ACTIVE_SESSION_NO, put)) {
- throw new IOException("There is an active backup exclusive operation");
- }
- }
- }
- }
-
- private Put createPutForStartBackupSession() {
- Put put = new Put(ACTIVE_SESSION_ROW);
- put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_YES);
- return put;
- }
-
- public void finishBackupExclusiveOperation() throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Finish backup exclusive operation");
- }
- try (Table table = connection.getTable(tableName)) {
- Put put = createPutForStopBackupSession();
- if (!table.checkAndPut(ACTIVE_SESSION_ROW, SESSIONS_FAMILY, ACTIVE_SESSION_COL,
- ACTIVE_SESSION_YES, put)) {
- throw new IOException("There is no active backup exclusive operation");
- }
- }
- }
-
- private Put createPutForStopBackupSession() {
- Put put = new Put(ACTIVE_SESSION_ROW);
- put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_NO);
- return put;
- }
-
- /**
- * Get the Region Servers log information after the last log roll from backup system table.
- * @param backupRoot root directory path to backup
- * @return RS log info
- * @throws IOException exception
- */
- public HashMap<String, Long> readRegionServerLastLogRollResult(String backupRoot)
- throws IOException {
- if (LOG.isTraceEnabled()) {
- LOG.trace("read region server last roll log result to backup system table");
- }
-
- Scan scan = createScanForReadRegionServerLastLogRollResult(backupRoot);
-
- try (Table table = connection.getTable(tableName);
- ResultScanner scanner = table.getScanner(scan)) {
- Result res = null;
- HashMap<String, Long> rsTimestampMap = new HashMap<String, Long>();
- while ((res = scanner.next()) != null) {
- res.advance();
- Cell cell = res.current();
- byte[] row = CellUtil.cloneRow(cell);
- String server = getServerNameForReadRegionServerLastLogRollResult(row);
- byte[] data = CellUtil.cloneValue(cell);
- rsTimestampMap.put(server, Bytes.toLong(data));
- }
- return rsTimestampMap;
- }
- }
-
- /**
- * Writes Region Server last roll log result (timestamp) to backup system table table
- * @param server Region Server name
- * @param ts last log timestamp
- * @param backupRoot root directory path to backup
- * @throws IOException exception
- */
- public void writeRegionServerLastLogRollResult(String server, Long ts, String backupRoot)
- throws IOException {
- if (LOG.isTraceEnabled()) {
- LOG.trace("write region server last roll log result to backup system table");
- }
- try (Table table = connection.getTable(tableName)) {
- Put put = createPutForRegionServerLastLogRollResult(server, ts, backupRoot);
- table.put(put);
- }
- }
-
- /**
- * Get all completed backup information (in desc order by time)
- * @param onlyCompleted true, if only successfully completed sessions
- * @return history info of BackupCompleteData
- * @throws IOException exception
- */
- public ArrayList<BackupInfo> getBackupHistory(boolean onlyCompleted) throws IOException {
- if (LOG.isTraceEnabled()) {
- LOG.trace("get backup history from backup system table");
- }
- ArrayList<BackupInfo> list;
- BackupState state = onlyCompleted ? BackupState.COMPLETE : BackupState.ANY;
- list = getBackupInfos(state);
- return BackupUtils.sortHistoryListDesc(list);
- }
-
- /**
- * Get all backups history
- * @return list of backup info
- * @throws IOException
- */
- public List<BackupInfo> getBackupHistory() throws IOException {
- return getBackupHistory(false);
- }
-
- /**
- * Get first n backup history records
- * @param n number of records, if n== -1 - max number
- * is ignored
- * @return list of records
- * @throws IOException
- */
- public List<BackupInfo> getHistory(int n) throws IOException {
-
- List<BackupInfo> history = getBackupHistory();
- if (n == -1 || history.size() <= n) return history;
- List<BackupInfo> list = new ArrayList<BackupInfo>();
- for (int i = 0; i < n; i++) {
- list.add(history.get(i));
- }
- return list;
-
- }
-
- /**
- * Get backup history records filtered by list of filters.
- * @param n max number of records, if n == -1 , then max number
- * is ignored
- * @param filters list of filters
- * @return backup records
- * @throws IOException
- */
- public List<BackupInfo> getBackupHistory(int n, BackupInfo.Filter... filters) throws IOException {
- if (filters.length == 0) return getHistory(n);
-
- List<BackupInfo> history = getBackupHistory();
- List<BackupInfo> result = new ArrayList<BackupInfo>();
- for (BackupInfo bi : history) {
- if (n >= 0 && result.size() == n) break;
- boolean passed = true;
- for (int i = 0; i < filters.length; i++) {
- if (!filters[i].apply(bi)) {
- passed = false;
- break;
- }
- }
- if (passed) {
- result.add(bi);
- }
- }
- return result;
-
- }
-
- /*
- * Retrieve TableName's for completed backup of given type
- * @param type backup type
- * @return List of table names
- */
- public List<TableName> getTablesForBackupType(BackupType type) throws IOException {
- Set<TableName> names = new HashSet<>();
- List<BackupInfo> infos = getBackupHistory(true);
- for (BackupInfo info : infos) {
- if (info.getType() != type) continue;
- names.addAll(info.getTableNames());
- }
- return new ArrayList(names);
- }
-
- /**
- * Get history for backup destination
- * @param backupRoot backup destination path
- * @return List of backup info
- * @throws IOException
- */
- public List<BackupInfo> getBackupHistory(String backupRoot) throws IOException {
- ArrayList<BackupInfo> history = getBackupHistory(false);
- for (Iterator<BackupInfo> iterator = history.iterator(); iterator.hasNext();) {
- BackupInfo info = iterator.next();
- if (!backupRoot.equals(info.getBackupRootDir())) {
- iterator.remove();
- }
- }
- return history;
- }
-
- /**
- * Get history for a table
- * @param name table name
- * @return history for a table
- * @throws IOException
- */
- public List<BackupInfo> getBackupHistoryForTable(TableName name) throws IOException {
- List<BackupInfo> history = getBackupHistory();
- List<BackupInfo> tableHistory = new ArrayList<BackupInfo>();
- for (BackupInfo info : history) {
- List<TableName> tables = info.getTableNames();
- if (tables.contains(name)) {
- tableHistory.add(info);
- }
- }
- return tableHistory;
- }
-
- public Map<TableName, ArrayList<BackupInfo>> getBackupHistoryForTableSet(Set<TableName> set,
- String backupRoot) throws IOException {
- List<BackupInfo> history = getBackupHistory(backupRoot);
- Map<TableName, ArrayList<BackupInfo>> tableHistoryMap =
- new HashMap<TableName, ArrayList<BackupInfo>>();
- for (Iterator<BackupInfo> iterator = history.iterator(); iterator.hasNext();) {
- BackupInfo info = iterator.next();
- if (!backupRoot.equals(info.getBackupRootDir())) {
- continue;
- }
- List<TableName> tables = info.getTableNames();
- for (TableName tableName : tables) {
- if (set.contains(tableName)) {
- ArrayList<BackupInfo> list = tableHistoryMap.get(tableName);
- if (list == null) {
- list = new ArrayList<BackupInfo>();
- tableHistoryMap.put(tableName, list);
- }
- list.add(info);
- }
- }
- }
- return tableHistoryMap;
- }
-
- /**
- * Get all backup sessions with a given state (in descending order by time)
- * @param state backup session state
- * @return history info of backup info objects
- * @throws IOException exception
- */
- public ArrayList<BackupInfo> getBackupInfos(BackupState state) throws IOException {
- if (LOG.isTraceEnabled()) {
- LOG.trace("get backup infos from backup system table");
- }
-
- Scan scan = createScanForBackupHistory();
- ArrayList<BackupInfo> list = new ArrayList<BackupInfo>();
-
- try (Table table = connection.getTable(tableName);
- ResultScanner scanner = table.getScanner(scan)) {
- Result res = null;
- while ((res = scanner.next()) != null) {
- res.advance();
- BackupInfo context = cellToBackupInfo(res.current());
- if (state != BackupState.ANY && context.getState() != state) {
- continue;
- }
- list.add(context);
- }
- return list;
- }
- }
-
- /**
- * Write the current timestamps for each regionserver to backup system table after a successful
- * full or incremental backup. The saved timestamp is of the last log file that was backed up
- * already.
- * @param tables tables
- * @param newTimestamps timestamps
- * @param backupRoot root directory path to backup
- * @throws IOException exception
- */
- public void writeRegionServerLogTimestamp(Set<TableName> tables,
- HashMap<String, Long> newTimestamps, String backupRoot) throws IOException {
- if (LOG.isTraceEnabled()) {
- LOG.trace("write RS log time stamps to backup system table for tables ["
- + StringUtils.join(tables, ",") + "]");
- }
- List<Put> puts = new ArrayList<Put>();
- for (TableName table : tables) {
- byte[] smapData = toTableServerTimestampProto(table, newTimestamps).toByteArray();
- Put put = createPutForWriteRegionServerLogTimestamp(table, smapData, backupRoot);
- puts.add(put);
- }
- try (Table table = connection.getTable(tableName)) {
- table.put(puts);
- }
- }
-
- /**
- * Read the timestamp for each region server log after the last successful backup. Each table has
- * its own set of the timestamps. The info is stored for each table as a concatenated string of
- * rs->timestapmp
- * @param backupRoot root directory path to backup
- * @return the timestamp for each region server. key: tableName value:
- * RegionServer,PreviousTimeStamp
- * @throws IOException exception
- */
- public HashMap<TableName, HashMap<String, Long>> readLogTimestampMap(String backupRoot)
- throws IOException {
- if (LOG.isTraceEnabled()) {
- LOG.trace("read RS log ts from backup system table for root=" + backupRoot);
- }
-
- HashMap<TableName, HashMap<String, Long>> tableTimestampMap =
- new HashMap<TableName, HashMap<String, Long>>();
-
- Scan scan = createScanForReadLogTimestampMap(backupRoot);
- try (Table table = connection.getTable(tableName);
- ResultScanner scanner = table.getScanner(scan)) {
- Result res = null;
- while ((res = scanner.next()) != null) {
- res.advance();
- Cell cell = res.current();
- byte[] row = CellUtil.cloneRow(cell);
- String tabName = getTableNameForReadLogTimestampMap(row);
- TableName tn = TableName.valueOf(tabName);
- byte[] data = CellUtil.cloneValue(cell);
- if (data == null) {
- throw new IOException("Data of last backup data from backup system table "
- + "is empty. Create a backup first.");
- }
- if (data != null && data.length > 0) {
- HashMap<String, Long> lastBackup =
- fromTableServerTimestampProto(BackupProtos.TableServerTimestamp.parseFrom(data));
- tableTimestampMap.put(tn, lastBackup);
- }
- }
- return tableTimestampMap;
- }
- }
-
- private BackupProtos.TableServerTimestamp toTableServerTimestampProto(TableName table,
- Map<String, Long> map) {
- BackupProtos.TableServerTimestamp.Builder tstBuilder =
- BackupProtos.TableServerTimestamp.newBuilder();
- tstBuilder.setTableName(org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil
- .toProtoTableName(table));
-
- for (Entry<String, Long> entry : map.entrySet()) {
- BackupProtos.ServerTimestamp.Builder builder = BackupProtos.ServerTimestamp.newBuilder();
- HBaseProtos.ServerName.Builder snBuilder = HBaseProtos.ServerName.newBuilder();
- ServerName sn = ServerName.parseServerName(entry.getKey());
- snBuilder.setHostName(sn.getHostname());
- snBuilder.setPort(sn.getPort());
- builder.setServerName(snBuilder.build());
- builder.setTimestamp(entry.getValue());
- tstBuilder.addServerTimestamp(builder.build());
- }
-
- return tstBuilder.build();
- }
-
- private HashMap<String, Long> fromTableServerTimestampProto(
- BackupProtos.TableServerTimestamp proto) {
- HashMap<String, Long> map = new HashMap<String, Long>();
- List<BackupProtos.ServerTimestamp> list = proto.getServerTimestampList();
- for (BackupProtos.ServerTimestamp st : list) {
- ServerName sn =
- org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toServerName(st.getServerName());
- map.put(sn.getHostname() + ":" + sn.getPort(), st.getTimestamp());
- }
- return map;
- }
-
- /**
- * Return the current tables covered by incremental backup.
- * @param backupRoot root directory path to backup
- * @return set of tableNames
- * @throws IOException exception
- */
- public Set<TableName> getIncrementalBackupTableSet(String backupRoot) throws IOException {
- if (LOG.isTraceEnabled()) {
- LOG.trace("get incremental backup table set from backup system table");
- }
- TreeSet<TableName> set = new TreeSet<>();
-
- try (Table table = connection.getTable(tableName)) {
- Get get = createGetForIncrBackupTableSet(backupRoot);
- Result res = table.get(get);
- if (res.isEmpty()) {
- return set;
- }
- List<Cell> cells = res.listCells();
- for (Cell cell : cells) {
- // qualifier = table name - we use table names as qualifiers
- set.add(TableName.valueOf(CellUtil.cloneQualifier(cell)));
- }
- return set;
- }
- }
-
- /**
- * Add tables to global incremental backup set
- * @param tables set of tables
- * @param backupRoot root directory path to backup
- * @throws IOException exception
- */
- public void addIncrementalBackupTableSet(Set<TableName> tables, String backupRoot)
- throws IOException {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Add incremental backup table set to backup system table. ROOT=" + backupRoot
- + " tables [" + StringUtils.join(tables, " ") + "]");
- for (TableName table : tables) {
- LOG.debug(table);
- }
- }
- try (Table table = connection.getTable(tableName)) {
- Put put = createPutForIncrBackupTableSet(tables, backupRoot);
- table.put(put);
- }
- }
-
- /**
- * Deletes incremental backup set for a backup destination
- * @param backupRoot backup root
- */
-
- public void deleteIncrementalBackupTableSet(String backupRoot) throws IOException {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Delete incremental backup table set to backup system table. ROOT=" + backupRoot);
- }
- try (Table table = connection.getTable(tableName)) {
- Delete delete = createDeleteForIncrBackupTableSet(backupRoot);
- table.delete(delete);
- }
- }
-
- /**
- * Register WAL files as eligible for deletion
- * @param files files
- * @param backupId backup id
- * @param backupRoot root directory path to backup destination
- * @throws IOException exception
- */
- public void addWALFiles(List<String> files, String backupId, String backupRoot)
- throws IOException {
- if (LOG.isTraceEnabled()) {
- LOG.trace("add WAL files to backup system table: " + backupId + " " + backupRoot + " files ["
- + StringUtils.join(files, ",") + "]");
- for (String f : files) {
- LOG.debug("add :" + f);
- }
- }
- try (Table table = connection.getTable(tableName)) {
- List<Put> puts = createPutsForAddWALFiles(files, backupId, backupRoot);
- table.put(puts);
- }
- }
-
- /**
- * Register WAL files as eligible for deletion
- * @param backupRoot root directory path to backup
- * @throws IOException exception
- */
- public Iterator<WALItem> getWALFilesIterator(String backupRoot) throws IOException {
- if (LOG.isTraceEnabled()) {
- LOG.trace("get WAL files from backup system table");
- }
- final Table table = connection.getTable(tableName);
- Scan scan = createScanForGetWALs(backupRoot);
- final ResultScanner scanner = table.getScanner(scan);
- final Iterator<Result> it = scanner.iterator();
- return new Iterator<WALItem>() {
-
- @Override
- public boolean hasNext() {
- boolean next = it.hasNext();
- if (!next) {
- // close all
- try {
- scanner.close();
- table.close();
- } catch (IOException e) {
- LOG.error("Close WAL Iterator", e);
- }
- }
- return next;
- }
-
- @Override
- public WALItem next() {
- Result next = it.next();
- List<Cell> cells = next.listCells();
- byte[] buf = cells.get(0).getValueArray();
- int len = cells.get(0).getValueLength();
- int offset = cells.get(0).getValueOffset();
- String backupId = new String(buf, offset, len);
- buf = cells.get(1).getValueArray();
- len = cells.get(1).getValueLength();
- offset = cells.get(1).getValueOffset();
- String walFile = new String(buf, offset, len);
- buf = cells.get(2).getValueArray();
- len = cells.get(2).getValueLength();
- offset = cells.get(2).getValueOffset();
- String backupRoot = new String(buf, offset, len);
- return new WALItem(backupId, walFile, backupRoot);
- }
-
- @Override
- public void remove() {
- // not implemented
- throw new RuntimeException("remove is not supported");
- }
- };
-
- }
-
- /**
- * Check if WAL file is eligible for deletion Future: to support all backup destinations
- * @param file name of a file to check
- * @return true, if deletable, false otherwise.
- * @throws IOException exception
- * TODO: multiple backup destination support
- */
- public boolean isWALFileDeletable(String file) throws IOException {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Check if WAL file has been already backed up in backup system table " + file);
- }
- try (Table table = connection.getTable(tableName)) {
- Get get = createGetForCheckWALFile(file);
- Result res = table.get(get);
- if (res.isEmpty()) {
- return false;
- }
- return true;
- }
- }
-
- /**
- * Checks if we have at least one backup session in backup system table This API is used by
- * BackupLogCleaner
- * @return true, if - at least one session exists in backup system table table
- * @throws IOException exception
- */
- public boolean hasBackupSessions() throws IOException {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Has backup sessions from backup system table");
- }
- boolean result = false;
- Scan scan = createScanForBackupHistory();
- scan.setCaching(1);
- try (Table table = connection.getTable(tableName);
- ResultScanner scanner = table.getScanner(scan)) {
- if (scanner.next() != null) {
- result = true;
- }
- return result;
- }
- }
-
- /**
- * BACKUP SETS
- */
-
- /**
- * Get backup set list
- * @return backup set list
- * @throws IOException
- */
- public List<String> listBackupSets() throws IOException {
- if (LOG.isTraceEnabled()) {
- LOG.trace(" Backup set list");
- }
- List<String> list = new ArrayList<String>();
- Table table = null;
- ResultScanner scanner = null;
- try {
- table = connection.getTable(tableName);
- Scan scan = createScanForBackupSetList();
- scan.setMaxVersions(1);
- scanner = table.getScanner(scan);
- Result res = null;
- while ((res = scanner.next()) != null) {
- res.advance();
- list.add(cellKeyToBackupSetName(res.current()));
- }
- return list;
- } finally {
- if (scanner != null) {
- scanner.close();
- }
- if (table != null) {
- table.close();
- }
- }
- }
-
- /**
- * Get backup set description (list of tables)
- * @param name set's name
- * @return list of tables in a backup set
- * @throws IOException
- */
- public List<TableName> describeBackupSet(String name) throws IOException {
- if (LOG.isTraceEnabled()) {
- LOG.trace(" Backup set describe: " + name);
- }
- Table table = null;
- try {
- table = connection.getTable(tableName);
- Get get = createGetForBackupSet(name);
- Result res = table.get(get);
- if (res.isEmpty()) return null;
- res.advance();
- String[] tables = cellValueToBackupSet(res.current());
- return toList(tables);
- } finally {
- if (table != null) {
- table.close();
- }
- }
- }
-
- private List<TableName> toList(String[] tables) {
- List<TableName> list = new ArrayList<TableName>(tables.length);
- for (String name : tables) {
- list.add(TableName.valueOf(name));
- }
- return list;
- }
-
- /**
- * Add backup set (list of tables)
- * @param name set name
- * @param newTables list of tables, comma-separated
- * @throws IOException
- */
- public void addToBackupSet(String name, String[] newTables) throws IOException {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Backup set add: " + name + " tables [" + StringUtils.join(newTables, " ") + "]");
- }
- Table table = null;
- String[] union = null;
- try {
- table = connection.getTable(tableName);
- Get get = createGetForBackupSet(name);
- Result res = table.get(get);
- if (res.isEmpty()) {
- union = newTables;
- } else {
- res.advance();
- String[] tables = cellValueToBackupSet(res.current());
- union = merge(tables, newTables);
- }
- Put put = createPutForBackupSet(name, union);
- table.put(put);
- } finally {
- if (table != null) {
- table.close();
- }
- }
- }
-
- private String[] merge(String[] tables, String[] newTables) {
- List<String> list = new ArrayList<String>();
- // Add all from tables
- for (String t : tables) {
- list.add(t);
- }
- for (String nt : newTables) {
- if (list.contains(nt)) continue;
- list.add(nt);
- }
- String[] arr = new String[list.size()];
- list.toArray(arr);
- return arr;
- }
-
- /**
- * Remove tables from backup set (list of tables)
- * @param name set name
- * @param toRemove list of tables
- * @throws IOException
- */
- public void removeFromBackupSet(String name, String[] toRemove) throws IOException {
- if (LOG.isTraceEnabled()) {
- LOG.trace(" Backup set remove from : " + name + " tables [" + StringUtils.join(toRemove, " ")
- + "]");
- }
- Table table = null;
- String[] disjoint = null;
- String[] tables = null;
- try {
- table = connection.getTable(tableName);
- Get get = createGetForBackupSet(name);
- Result res = table.get(get);
- if (res.isEmpty()) {
- LOG.warn("Backup set '" + name + "' not found.");
- return;
- } else {
- res.advance();
- tables = cellValueToBackupSet(res.current());
- disjoint = disjoin(tables, toRemove);
- }
- if (disjoint.length > 0 && disjoint.length != tables.length) {
- Put put = createPutForBackupSet(name, disjoint);
- table.put(put);
- } else if (disjoint.length == tables.length) {
- LOG.warn("Backup set '" + name + "' does not contain tables ["
- + StringUtils.join(toRemove, " ") + "]");
- } else { // disjoint.length == 0 and tables.length >0
- // Delete backup set
- LOG.info("Backup set '" + name + "' is empty. Deleting.");
- deleteBackupSet(name);
- }
- } finally {
- if (table != null) {
- table.close();
- }
- }
- }
-
- private String[] disjoin(String[] tables, String[] toRemove) {
- List<String> list = new ArrayList<String>();
- // Add all from tables
- for (String t : tables) {
- list.add(t);
- }
- for (String nt : toRemove) {
- if (list.contains(nt)) {
- list.remove(nt);
- }
- }
- String[] arr = new String[list.size()];
- list.toArray(arr);
- return arr;
- }
-
- /**
- * Delete backup set
- * @param name set's name
- * @throws IOException
- */
- public void deleteBackupSet(String name) throws IOException {
- if (LOG.isTraceEnabled()) {
- LOG.trace(" Backup set delete: " + name);
- }
- Table table = null;
- try {
- table = connection.getTable(tableName);
- Delete del = createDeleteForBackupSet(name);
- table.delete(del);
- } finally {
- if (table != null) {
- table.close();
- }
- }
- }
-
- /**
- * Get backup system table descriptor
- * @return table's descriptor
- */
- public static HTableDescriptor getSystemTableDescriptor(Configuration conf) {
-
- HTableDescriptor tableDesc = new HTableDescriptor(getTableName(conf));
- HColumnDescriptor colSessionsDesc = new HColumnDescriptor(SESSIONS_FAMILY);
- colSessionsDesc.setMaxVersions(1);
- // Time to keep backup sessions (secs)
- Configuration config = HBaseConfiguration.create();
- int ttl =
- config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY,
- BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT);
- colSessionsDesc.setTimeToLive(ttl);
- tableDesc.addFamily(colSessionsDesc);
- HColumnDescriptor colMetaDesc = new HColumnDescriptor(META_FAMILY);
- tableDesc.addFamily(colMetaDesc);
- return tableDesc;
- }
-
- public static TableName getTableName(Configuration conf) {
- String name =
- conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY,
- BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT);
- return TableName.valueOf(name);
- }
-
- public static String getTableNameAsString(Configuration conf) {
- return getTableName(conf).getNameAsString();
- }
-
- public static String getSnapshotName(Configuration conf) {
- return "snapshot_" + getTableNameAsString(conf).replace(":", "_");
- }
-
- /**
- * Creates Put operation for a given backup info object
- * @param context backup info
- * @return put operation
- * @throws IOException exception
- */
- private Put createPutForBackupInfo(BackupInfo context) throws IOException {
- Put put = new Put(rowkey(BACKUP_INFO_PREFIX, context.getBackupId()));
- put.addColumn(BackupSystemTable.SESSIONS_FAMILY, Bytes.toBytes("context"),
- context.toByteArray());
- return put;
- }
-
- /**
- * Creates Get operation for a given backup id
- * @param backupId backup's ID
- * @return get operation
- * @throws IOException exception
- */
- private Get createGetForBackupInfo(String backupId) throws IOException {
- Get get = new Get(rowkey(BACKUP_INFO_PREFIX, backupId));
- get.addFamily(BackupSystemTable.SESSIONS_FAMILY);
- get.setMaxVersions(1);
- return get;
- }
-
- /**
- * Creates Delete operation for a given backup id
- * @param backupId backup's ID
- * @return delete operation
- * @throws IOException exception
- */
- private Delete createDeleteForBackupInfo(String backupId) {
- Delete del = new Delete(rowkey(BACKUP_INFO_PREFIX, backupId));
- del.addFamily(BackupSystemTable.SESSIONS_FAMILY);
- return del;
- }
-
- /**
- * Converts Result to BackupInfo
- * @param res HBase result
- * @return backup info instance
- * @throws IOException exception
- */
- private BackupInfo resultToBackupInfo(Result res) throws IOException {
- res.advance();
- Cell cell = res.current();
- return cellToBackupInfo(cell);
- }
-
- /**
- * Creates Get operation to retrieve start code from backup system table
- * @return get operation
- * @throws IOException exception
- */
- private Get createGetForStartCode(String rootPath) throws IOException {
- Get get = new Get(rowkey(START_CODE_ROW, rootPath));
- get.addFamily(BackupSystemTable.META_FAMILY);
- get.setMaxVersions(1);
- return get;
- }
-
- /**
- * Creates Put operation to store start code to backup system table
- * @return put operation
- * @throws IOException exception
- */
- private Put createPutForStartCode(String startCode, String rootPath) {
- Put put = new Put(rowkey(START_CODE_ROW, rootPath));
- put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("startcode"),
- Bytes.toBytes(startCode));
- return put;
- }
-
- /**
- * Creates Get to retrieve incremental backup table set from backup system table
- * @return get operation
- * @throws IOException exception
- */
- private Get createGetForIncrBackupTableSet(String backupRoot) throws IOException {
- Get get = new Get(rowkey(INCR_BACKUP_SET, backupRoot));
- get.addFamily(BackupSystemTable.META_FAMILY);
- get.setMaxVersions(1);
- return get;
- }
-
- /**
- * Creates Put to store incremental backup table set
- * @param tables tables
- * @return put operation
- */
- private Put createPutForIncrBackupTableSet(Set<TableName> tables, String backupRoot) {
- Put put = new Put(rowkey(INCR_BACKUP_SET, backupRoot));
- for (TableName table : tables) {
- put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes(table.getNameAsString()),
- EMPTY_VALUE);
- }
- return put;
- }
-
- /**
- * Creates Delete for incremental backup table set
- * @param backupRoot backup root
- * @return delete operation
- */
- private Delete createDeleteForIncrBackupTableSet(String backupRoot) {
- Delete delete = new Delete(rowkey(INCR_BACKUP_SET, backupRoot));
- delete.addFamily(BackupSystemTable.META_FAMILY);
- return delete;
- }
-
- /**
- * Creates Scan operation to load backup history
- * @return scan operation
- */
- private Scan createScanForBackupHistory() {
- Scan scan = new Scan();
- byte[] startRow = Bytes.toBytes(BACKUP_INFO_PREFIX);
- byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
- stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
- scan.setStartRow(startRow);
- scan.setStopRow(stopRow);
- scan.addFamily(BackupSystemTable.SESSIONS_FAMILY);
- scan.setMaxVersions(1);
- return scan;
- }
-
- /**
- * Converts cell to backup info instance.
- * @param current current cell
- * @return backup backup info instance
- * @throws IOException exception
- */
- private BackupInfo cellToBackupInfo(Cell current) throws IOException {
- byte[] data = CellUtil.cloneValue(current);
- return BackupInfo.fromByteArray(data);
- }
-
- /**
- * Creates Put to write RS last roll log timestamp map
- * @param table table
- * @param smap map, containing RS:ts
- * @return put operation
- */
- private Put createPutForWriteRegionServerLogTimestamp(TableName table, byte[] smap,
- String backupRoot) {
- Put put = new Put(rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot, NULL, table.getNameAsString()));
- put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("log-roll-map"), smap);
- return put;
- }
-
- /**
- * Creates Scan to load table-> { RS -> ts} map of maps
- * @return scan operation
- */
- private Scan createScanForReadLogTimestampMap(String backupRoot) {
- Scan scan = new Scan();
- byte[] startRow = rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot);
- byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
- stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
- scan.setStartRow(startRow);
- scan.setStopRow(stopRow);
- scan.addFamily(BackupSystemTable.META_FAMILY);
-
- return scan;
- }
-
- /**
- * Get table name from rowkey
- * @param cloneRow rowkey
- * @return table name
- */
- private String getTableNameForReadLogTimestampMap(byte[] cloneRow) {
- String s = Bytes.toString(cloneRow);
- int index = s.lastIndexOf(NULL);
- return s.substring(index + 1);
- }
-
- /**
- * Creates Put to store RS last log result
- * @param server server name
- * @param timestamp log roll result (timestamp)
- * @return put operation
- */
- private Put createPutForRegionServerLastLogRollResult(String server, Long timestamp,
- String backupRoot) {
- Put put = new Put(rowkey(RS_LOG_TS_PREFIX, backupRoot, NULL, server));
- put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("rs-log-ts"),
- Bytes.toBytes(timestamp));
- return put;
- }
-
- /**
- * Creates Scan operation to load last RS log roll results
- * @return scan operation
- */
- private Scan createScanForReadRegionServerLastLogRollResult(String backupRoot) {
- Scan scan = new Scan();
- byte[] startRow = rowkey(RS_LOG_TS_PREFIX, backupRoot);
- byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
- stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
- scan.setStartRow(startRow);
- scan.setStopRow(stopRow);
- scan.addFamily(BackupSystemTable.META_FAMILY);
- scan.setMaxVersions(1);
-
- return scan;
- }
-
- /**
- * Get server's name from rowkey
- * @param row rowkey
- * @return server's name
- */
- private String getServerNameForReadRegionServerLastLogRollResult(byte[] row) {
- String s = Bytes.toString(row);
- int index = s.lastIndexOf(NULL);
- return s.substring(index + 1);
- }
-
- /*
- * Creates Put's for bulk load resulting from running LoadIncrementalHFiles
- */
- static List<Put> createPutForCommittedBulkload(TableName table, byte[] region,
- Map<byte[], List<Path>> finalPaths) {
- List<Put> puts = new ArrayList<>();
- for (Map.Entry<byte[], List<Path>> entry : finalPaths.entrySet()) {
- for (Path path : entry.getValue()) {
- String file = path.toString();
- int lastSlash = file.lastIndexOf("/");
- String filename = file.substring(lastSlash + 1);
- Put put =
- new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM,
- Bytes.toString(region), BLK_LD_DELIM, filename));
- put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName());
- put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, entry.getKey());
- put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, file.getBytes());
- put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_COMMIT);
- puts.add(put);
- LOG.debug("writing done bulk path " + file + " for " + table + " " + Bytes.toString(region));
- }
- }
- return puts;
- }
-
- public static void snapshot(Connection conn) throws IOException {
-
- try (Admin admin = conn.getAdmin();) {
- Configuration conf = conn.getConfiguration();
- admin.snapshot(BackupSystemTable.getSnapshotName(conf), BackupSystemTable.getTableName(conf));
- }
- }
-
- public static void restoreFromSnapshot(Connection conn) throws IOException {
-
- Configuration conf = conn.getConfiguration();
- LOG.debug("Restoring " + BackupSystemTable.getTableNameAsString(conf) + " from snapshot");
- try (Admin admin = conn.getAdmin();) {
- String snapshotName = BackupSystemTable.getSnapshotName(conf);
- if (snapshotExists(admin, snapshotName)) {
- admin.disableTable(BackupSystemTable.getTableName(conf));
- admin.restoreSnapshot(snapshotName);
- admin.enableTable(BackupSystemTable.getTableName(conf));
- LOG.debug("Done restoring backup system table");
- } else {
- // Snapshot does not exists, i.e completeBackup failed after
- // deleting backup system table snapshot
- // In this case we log WARN and proceed
- LOG.warn("Could not restore backup system table. Snapshot " + snapshotName
- + " does not exists.");
- }
- }
- }
-
- protected static boolean snapshotExists(Admin admin, String snapshotName) throws IOException {
-
- List<SnapshotDescription> list = admin.listSnapshots();
- for (SnapshotDescription desc : list) {
- if (desc.getName().equals(snapshotName)) {
- return true;
- }
- }
- return false;
- }
-
- public static boolean snapshotExists(Connection conn) throws IOException {
- return snapshotExists(conn.getAdmin(), getSnapshotName(conn.getConfiguration()));
- }
-
- public static void deleteSnapshot(Connection conn) throws IOException {
-
- Configuration conf = conn.getConfiguration();
- LOG.debug("Deleting " + BackupSystemTable.getSnapshotName(conf) + " from the system");
- try (Admin admin = conn.getAdmin();) {
- String snapshotName = BackupSystemTable.getSnapshotName(conf);
- if (snapshotExists(admin, snapshotName)) {
- admin.deleteSnapshot(snapshotName);
- LOG.debug("Done deleting backup system table snapshot");
- } else {
- LOG.error("Snapshot " + snapshotName + " does not exists");
- }
- }
- }
-
- /*
- * Creates Put's for bulk load resulting from running LoadIncrementalHFiles
- */
- static List<Put> createPutForPreparedBulkload(TableName table, byte[] region,
- final byte[] family, final List<Pair<Path, Path>> pairs) {
- List<Put> puts = new ArrayList<>();
- for (Pair<Path, Path> pair : pairs) {
- Path path = pair.getSecond();
- String file = path.toString();
- int lastSlash = file.lastIndexOf("/");
- String filename = file.substring(lastSlash + 1);
- Put put =
- new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM, Bytes.toString(region),
- BLK_LD_DELIM, filename));
- put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName());
- put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, family);
- put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, file.getBytes());
- put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_PREPARE);
- puts.add(put);
- LOG.debug("writing raw bulk path " + file + " for " + table + " " + Bytes.toString(region));
- }
- return puts;
- }
-
- public static List<Delete> createDeleteForOrigBulkLoad(List<TableName> lst) {
- List<Delete> lstDels = new ArrayList<>();
- for (TableName table : lst) {
- Delete del = new Delete(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM));
- del.addFamily(BackupSystemTable.META_FAMILY);
- lstDels.add(del);
- }
- return lstDels;
- }
-
- private Put createPutForDeleteOperation(String[] backupIdList) {
-
- byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ","));
- Put put = new Put(DELETE_OP_ROW);
- put.addColumn(META_FAMILY, FAM_COL, value);
- return put;
- }
-
- private Delete createDeleteForBackupDeleteOperation() {
-
- Delete delete = new Delete(DELETE_OP_ROW);
- delete.addFamily(META_FAMILY);
- return delete;
- }
-
- private Get createGetForDeleteOperation() {
-
- Get get = new Get(DELETE_OP_ROW);
- get.addFamily(META_FAMILY);
- return get;
- }
-
- public void startDeleteOperation(String[] backupIdList) throws IOException {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Start delete operation for backups: " + StringUtils.join(backupIdList));
- }
- Put put = createPutForDeleteOperation(backupIdList);
- try (Table table = connection.getTable(tableName)) {
- table.put(put);
- }
- }
-
- public void finishDeleteOperation() throws IOException {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Finsih delete operation for backup ids ");
- }
- Delete delete = createDeleteForBackupDeleteOperation();
- try (Table table = connection.getTable(tableName)) {
- table.delete(delete);
- }
- }
-
- public String[] getListOfBackupIdsFromDeleteOperation() throws IOException {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Get delete operation for backup ids ");
- }
- Get get = createGetForDeleteOperation();
- try (Table table = connection.getTable(tableName)) {
- Result res = table.get(get);
- if (res.isEmpty()) {
- return null;
- }
- Cell cell = res.listCells().get(0);
- byte[] val = CellUtil.cloneValue(cell);
- if (val.length == 0) {
- return null;
- }
- return new String(val).split(",");
- }
- }
-
- private Put createPutForMergeOperation(String[] backupIdList) {
-
- byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ","));
- Put put = new Put(MERGE_OP_ROW);
- put.addColumn(META_FAMILY, FAM_COL, value);
- return put;
- }
-
- public boolean isMergeInProgress() throws IOException {
- Get get = new Get(MERGE_OP_ROW);
- try (Table table = connection.getTable(tableName)) {
- Result res = table.get(get);
- if (res.isEmpty()) {
- return false;
- }
- return true;
- }
- }
-
- private Put createPutForUpdateTablesForMerge(List<TableName> tables) {
-
- byte[] value = Bytes.toBytes(StringUtils.join(tables, ","));
- Put put = new Put(MERGE_OP_ROW);
- put.addColumn(META_FAMILY, PATH_COL, value);
- return put;
- }
-
- private Delete createDeleteForBackupMergeOperation() {
-
- Delete delete = new Delete(MERGE_OP_ROW);
- delete.addFamily(META_FAMILY);
- return delete;
- }
-
- private Get createGetForMergeOperation() {
-
- Get get = new Get(MERGE_OP_ROW);
- get.addFamily(META_FAMILY);
- return get;
- }
-
- public void startMergeOperation(String[] backupIdList) throws IOException {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Start merge operation for backups: " + StringUtils.join(backupIdList));
- }
- Put put = createPutForMergeOperation(backupIdList);
- try (Table table = connection.getTable(tableName)) {
- table.put(put);
- }
- }
-
- public void updateProcessedTablesForMerge(List<TableName> tables) throws IOException {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Update tables for merge : " + StringUtils.join(tables, ","));
- }
- Put put = createPutForUpdateTablesForMerge(tables);
- try (Table table = connection.getTable(tableName)) {
- table.put(put);
- }
- }
-
- public void finishMergeOperation() throws IOException {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Finsih merge operation for backup ids ");
- }
- Delete delete = createDeleteForBackupMergeOperation();
- try (Table table = connection.getTable(tableName)) {
- table.delete(delete);
- }
- }
-
- public String[] getListOfBackupIdsFromMergeOperation() throws IOException {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Get backup ids for merge operation");
- }
- Get get = createGetForMergeOperation();
- try (Table table = connection.getTable(tableName)) {
- Result res = table.get(get);
- if (res.isEmpty()) {
- return null;
- }
- Cell cell = res.listCells().get(0);
- byte[] val = CellUtil.cloneValue(cell);
- if (val.length == 0) {
- return null;
- }
- return new String(val).split(",");
- }
- }
-
- static Scan createScanForOrigBulkLoadedFiles(TableName table) throws IOException {
- Scan scan = new Scan();
- byte[] startRow = rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM);
- byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
- stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
- scan.withStartRow(startRow);
- scan.withStopRow(stopRow);
- scan.addFamily(BackupSystemTable.META_FAMILY);
- scan.setMaxVersions(1);
- return scan;
- }
-
- static String getTableNameFromOrigBulkLoadRow(String rowStr) {
- String[] parts = rowStr.split(BLK_LD_DELIM);
- return parts[1];
- }
-
- static String getRegionNameFromOrigBulkLoadRow(String rowStr) {
- // format is bulk : namespace : table : region : file
- String[] parts = rowStr.split(BLK_LD_DELIM);
- int idx = 3;
- if (parts.length == 4) {
- // the table is in default namespace
- idx = 2;
- }
- LOG.debug("bulk row string " + rowStr + " region " + parts[idx]);
- return parts[idx];
- }
-
- /*
- * Used to query bulk loaded hfiles which have been copied by incremental backup
- * @param backupId the backup Id. It can be null when querying for all tables
- * @return the Scan object
- */
- static Scan createScanForBulkLoadedFiles(String backupId) throws IOException {
- Scan scan = new Scan();
- byte[] startRow =
- backupId == null ? BULK_LOAD_PREFIX_BYTES : rowkey(BULK_LOAD_PREFIX, backupId
- + BLK_LD_DELIM);
- byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
- stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
- scan.setStartRow(startRow);
- scan.setStopRow(stopRow);
- // scan.setTimeRange(lower, Long.MAX_VALUE);
- scan.addFamily(BackupSystemTable.META_FAMILY);
- scan.setMaxVersions(1);
- return scan;
- }
-
- static Put createPutForBulkLoadedFile(TableName tn, byte[] fam, String p, String backupId,
- long ts, int idx) {
- Put put = new Put(rowkey(BULK_LOAD_PREFIX, backupId + BLK_LD_DELIM + ts + BLK_LD_DELIM + idx));
- put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, tn.getName());
- put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, fam);
- put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, p.getBytes());
- return put;
- }
-
- /**
- * Creates put list for list of WAL files
- * @param files list of WAL file paths
- * @param backupId backup id
- * @return put list
- * @throws IOException exception
- */
- private List<Put>
- createPutsForAddWALFiles(List<String> files, String backupId, String backupRoot)
- throws IOException {
-
- List<Put> puts = new ArrayList<Put>();
- for (String file : files) {
- Put put = new Put(rowkey(WALS_PREFIX, BackupUtils.getUniqueWALFileNamePart(file)));
- put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("backupId"),
- Bytes.toBytes(backupId));
- put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("file"), Bytes.toBytes(file));
- put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("root"), Bytes.toBytes(backupRoot));
- puts.add(put);
- }
- return puts;
- }
-
- /**
- * Creates Scan operation to load WALs
- * @param backupRoot path to backup destination
- * @return scan operation
- */
- private Scan createScanForGetWALs(String backupRoot) {
- // TODO: support for backupRoot
- Scan scan = new Scan();
- byte[] startRow = Bytes.toBytes(WALS_PREFIX);
- byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
- stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
- scan.setStartRow(startRow);
- scan.setStopRow(stopRow);
- scan.addFamily(BackupSystemTable.META_FAMILY);
- return scan;
- }
-
- /**
- * Creates Get operation for a given wal file name TODO: support for backup destination
- * @param file file
- * @return get operation
- * @throws IOException exception
- */
- private Get createGetForCheckWALFile(String file) throws IOException {
- Get get = new Get(rowkey(WALS_PREFIX, BackupUtils.getUniqueWALFileNamePart(file)));
- // add backup root column
- get.addFamily(BackupSystemTable.META_FAMILY);
- return get;
- }
-
- /**
- * Creates Scan operation to load backup set list
- * @return scan operation
- */
- private Scan createScanForBackupSetList() {
- Scan scan = new Scan();
- byte[] startRow = Bytes.toBytes(SET_KEY_PREFIX);
- byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
- stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
- scan.setStartRow(startRow);
- scan.setStopRow(stopRow);
- scan.addFamily(BackupSystemTable.META_FAMILY);
- return scan;
- }
-
- /**
- * Creates Get operation to load backup set content
- * @return get operation
- */
- private Get createGetForBackupSet(String name) {
- Get get = new Get(rowkey(SET_KEY_PREFIX, name));
- get.addFamily(BackupSystemTable.META_FAMILY);
- return get;
- }
-
- /**
- * Creates Delete operation to delete backup set content
- * @param name backup set's name
- * @return delete operation
- */
- private Delete createDeleteForBackupSet(String name) {
- Delete del = new Delete(rowkey(SET_KEY_PREFIX, name));
- del.addFamily(BackupSystemTable.META_FAMILY);
- return del;
- }
-
- /**
- * Creates Put operation to update backup set content
- * @param name backup set's name
- * @param tables list of tables
- * @return put operation
- */
- private Put createPutForBackupSet(String name, String[] tables) {
- Put put = new Put(rowkey(SET_KEY_PREFIX, name));
- byte[] value = convertToByteArray(tables);
- put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("tables"), value);
- return put;
- }
-
- private byte[] convertToByteArray(String[] tables) {
- return StringUtils.join(tables, ",").getBytes();
- }
-
- /**
- * Converts cell to backup set list.
- * @param current current cell
- * @return backup set as array of table names
- * @throws IOException
- */
- private String[] cellValueToBackupSet(Cell current) throws IOException {
- byte[] data = CellUtil.cloneValue(current);
- if (data != null && data.length > 0) {
- return Bytes.toString(data).split(",");
- } else {
- return new String[0];
- }
- }
-
- /**
- * Converts cell key to backup set name.
- * @param current current cell
- * @return backup set name
- * @throws IOException
- */
- private String cellKeyToBackupSetName(Cell current) throws IOException {
- byte[] data = CellUtil.cloneRow(current);
- return Bytes.toString(data).substring(SET_KEY_PREFIX.length());
- }
-
- private static byte[] rowkey(String s, String... other) {
- StringBuilder sb = new StringBuilder(s);
- for (String ss : other) {
- sb.append(ss);
- }
- return sb.toString().getBytes();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/2dda3712/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
deleted file mode 100644
index e323e96..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.backup.impl;
-
-import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.BACKUP_ATTEMPTS_PAUSE_MS_KEY;
-import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.BACKUP_MAX_ATTEMPTS_KEY;
-import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_BACKUP_ATTEMPTS_PAUSE_MS;
-import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_BACKUP_MAX_ATTEMPTS;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.backup.BackupCopyJob;
-import org.apache.hadoop.hbase.backup.BackupInfo;
-import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase;
-import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
-import org.apache.hadoop.hbase.backup.BackupRequest;
-import org.apache.hadoop.hbase.backup.BackupRestoreFactory;
-import org.apache.hadoop.hbase.backup.BackupType;
-import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
-import org.apache.hadoop.hbase.backup.util.BackupUtils;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-
-/**
- * Full table backup implementation
- *
- */
-@InterfaceAudience.Private
-public class FullTableBackupClient extends TableBackupClient {
- private static final Log LOG = LogFactory.getLog(FullTableBackupClient.class);
-
- public FullTableBackupClient() {
- }
-
- public FullTableBackupClient(final Connection conn, final String backupId, BackupRequest request)
- throws IOException {
- super(conn, backupId, request);
- }
-
- /**
- * Do snapshot copy.
- * @param backupInfo backup info
- * @throws Exception exception
- */
- protected void snapshotCopy(BackupInfo backupInfo) throws Exception {
- LOG.info("Snapshot copy is starting.");
-
- // set overall backup phase: snapshot_copy
- backupInfo.setPhase(BackupPhase.SNAPSHOTCOPY);
-
- // call ExportSnapshot to copy files based on hbase snapshot for backup
- // ExportSnapshot only support single snapshot export, need loop for multiple tables case
- BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf);
-
- // number of snapshots matches number of tables
- float numOfSnapshots = backupInfo.getSnapshotNames().size();
-
- LOG.debug("There are " + (int) numOfSnapshots + " snapshots to be copied.");
-
- for (TableName table : backupInfo.getTables()) {
- // Currently we simply set the sub copy tasks by counting the table snapshot number, we can
- // calculate the real files' size for the percentage in the future.
- // backupCopier.setSubTaskPercntgInWholeTask(1f / numOfSnapshots);
- int res = 0;
- String[] args = new String[4];
- args[0] = "-snapshot";
- args[1] = backupInfo.getSnapshotName(table);
- args[2] = "-copy-to";
- args[3] = backupInfo.getTableBackupDir(table);
-
- LOG.debug("Copy snapshot " + args[1] + " to " + args[3]);
- res = copyService.copy(backupInfo, backupManager, conf, BackupType.FULL, args);
- // if one snapshot export failed, do not continue for remained snapshots
- if (res != 0) {
- LOG.error("Exporting Snapshot " + args[1] + " failed with return code: " + res + ".");
-
- throw new IOException("Failed of exporting snapshot " + args[1] + " to " + args[3]
- + " with reason code " + res);
- }
- LOG.info("Snapshot copy " + args[1] + " finished.");
- }
- }
-
- /**
- * Backup request execution
- * @throws IOException
- */
- @Override
- public void execute() throws IOException {
- try (Admin admin = conn.getAdmin();) {
-
- // Begin BACKUP
- beginBackup(backupManager, backupInfo);
- String savedStartCode = null;
- boolean firstBackup = false;
- // do snapshot for full table backup
-
- savedStartCode = backupManager.readBackupStartCode();
- firstBackup = savedStartCode == null || Long.parseLong(savedStartCode) == 0L;
- if (firstBackup) {
- // This is our first backup. Let's put some marker to system table so that we can hold the logs
- // while we do the backup.
- backupManager.writeBackupStartCode(0L);
- }
- // We roll log here before we do the snapshot. It is possible there is duplicate data
- // in the log that is already in the snapshot. But if we do it after the snapshot, we
- // could have data loss.
- // A better approach is to do the roll log on each RS in the same global procedure as
- // the snapshot.
- LOG.info("Execute roll log procedure for full backup ...");
-
- Map<String, String> props = new HashMap<String, String>();
- props.put("backupRoot", backupInfo.getBackupRootDir());
- admin.execProcedure(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE,
- LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props);
-
- newTimestamps = backupManager.readRegionServerLastLogRollResult();
- if (firstBackup) {
- // Updates registered log files
- // We record ALL old WAL files as registered, because
- // this is a first full backup in the system and these
- // files are not needed for next incremental backup
- List<String> logFiles = BackupUtils.getWALFilesOlderThan(conf, newTimestamps);
- backupManager.recordWALFiles(logFiles);
- }
-
- // SNAPSHOT_TABLES:
- backupInfo.setPhase(BackupPhase.SNAPSHOT);
- for (TableName tableName : tableList) {
- String snapshotName =
- "snapshot_" + Long.toString(EnvironmentEdgeManager.currentTime()) + "_"
- + tableName.getNamespaceAsString() + "_" + tableName.getQualifierAsString();
-
- snapshotTable(admin, tableName, snapshotName);
- backupInfo.setSnapshotName(tableName, snapshotName);
- }
-
- // SNAPSHOT_COPY:
- // do snapshot copy
- LOG.debug("snapshot copy for " + backupId);
- snapshotCopy(backupInfo);
- // Updates incremental backup table set
- backupManager.addIncrementalBackupTableSet(backupInfo.getTables());
-
- // BACKUP_COMPLETE:
- // set overall backup status: complete. Here we make sure to complete the backup.
- // After this checkpoint, even if entering cancel process, will let the backup finished
- backupInfo.setState(BackupState.COMPLETE);
- // The table list in backupInfo is good for both full backup and incremental backup.
- // For incremental backup, it contains the incremental backup table set.
- backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps);
-
- HashMap<TableName, HashMap<String, Long>> newTableSetTimestampMap =
- backupManager.readLogTimestampMap();
-
- Long newStartCode =
- BackupUtils.getMinValue(BackupUtils
- .getRSLogTimestampMins(newTableSetTimestampMap));
- backupManager.writeBackupStartCode(newStartCode);
-
- // backup complete
- completeBackup(conn, backupInfo, backupManager, BackupType.FULL, conf);
- } catch (Exception e) {
- failBackup(conn, backupInfo, backupManager, e, "Unexpected BackupException : ",
- BackupType.FULL, conf);
- throw new IOException(e);
- }
-
- }
-
-
- protected void snapshotTable(Admin admin, TableName tableName, String snapshotName)
- throws IOException {
-
- int maxAttempts =
- conf.getInt(BACKUP_MAX_ATTEMPTS_KEY, DEFAULT_BACKUP_MAX_ATTEMPTS);
- int pause =
- conf.getInt(BACKUP_ATTEMPTS_PAUSE_MS_KEY, DEFAULT_BACKUP_ATTEMPTS_PAUSE_MS);
- int attempts = 0;
-
- while (attempts++ < maxAttempts) {
- try {
- admin.snapshot(snapshotName, tableName);
- return;
- } catch (IOException ee) {
- LOG.warn("Snapshot attempt " + attempts + " failed for table " + tableName
- + ", sleeping for " + pause + "ms", ee);
- if (attempts < maxAttempts) {
- try {
- Thread.sleep(pause);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- break;
- }
- }
- }
- }
- throw new IOException("Failed to snapshot table "+ tableName);
- }
-}