You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2015/12/27 19:02:20 UTC
[4/6] hbase git commit: HBASE-14030 HBase Backup/Restore Phase 1
(Vladimir Rodionov)
http://git-wip-us.apache.org/repos/asf/hbase/blob/de69f0df/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupSystemTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupSystemTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupSystemTable.java
new file mode 100644
index 0000000..14769f9
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupSystemTable.java
@@ -0,0 +1,642 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupHandler.BACKUPSTATUS;
+import org.apache.hadoop.hbase.backup.BackupUtil.BackupCompleteData;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+
+/**
+ * This class provides 'hbase:backup' table API
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public final class BackupSystemTable {
+
+ private static final Log LOG = LogFactory.getLog(BackupSystemTable.class);
+ private final static String TABLE_NAMESPACE = "hbase";
+ private final static String TABLE_NAME = "backup";
+ private final static TableName tableName = TableName.valueOf(TABLE_NAMESPACE, TABLE_NAME);
+ public final static byte[] familyName = "f".getBytes();
+
+ // Connection to HBase cluster
+ private static Connection connection;
+ // Cluster configuration
+ private static Configuration config;
+ // singleton
+ private static BackupSystemTable table;
+
+ /**
+ * Get instance by a given configuration
+ * @param conf - HBase configuration
+ * @return instance of BackupSystemTable
+ * @throws IOException exception
+ */
+ public synchronized static BackupSystemTable getTable(Configuration conf) throws IOException {
+ if (connection == null) {
+ connection = ConnectionFactory.createConnection(conf);
+ config = conf;
+ // Verify hbase:system exists
+ createSystemTableIfNotExists();
+ table = new BackupSystemTable();
+ }
+ return table;
+ }
+
+ /**
+ * TODO: refactor
+ * @throws IOException exception
+ */
+ public static void close() throws IOException {
+ connection.close();
+ table = null;
+ }
+
+ /**
+ * Gets table name
+ * @return table name
+ */
+ public static TableName getTableName() {
+ return tableName;
+ }
+
+ private static void createSystemTableIfNotExists() throws IOException {
+ Admin admin = null;
+ try {
+ admin = connection.getAdmin();
+ if (admin.tableExists(tableName) == false) {
+ HTableDescriptor tableDesc = new HTableDescriptor(tableName);
+ HColumnDescriptor colDesc = new HColumnDescriptor(familyName);
+ colDesc.setMaxVersions(1);
+ int ttl =
+ config.getInt(HConstants.BACKUP_SYSTEM_TTL_KEY, HConstants.BACKUP_SYSTEM_TTL_DEFAULT);
+ colDesc.setTimeToLive(ttl);
+ tableDesc.addFamily(colDesc);
+ admin.createTable(tableDesc);
+ }
+ } catch (IOException e) {
+ LOG.error(e);
+ throw e;
+ } finally {
+ if (admin != null) {
+ admin.close();
+ }
+ }
+ }
+
+ private BackupSystemTable() {
+ }
+
+ /**
+ * Updates status (state) of a backup session in hbase:backup table
+ * @param context context
+ * @throws IOException exception
+ */
+ public void updateBackupStatus(BackupContext context) throws IOException {
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("update backup status in hbase:backup for: " + context.getBackupId()
+ + " set status=" + context.getFlag());
+ }
+ Table table = null;
+ try {
+ table = connection.getTable(tableName);
+ Put put = BackupSystemTableHelper.createPutForBackupContext(context);
+ table.put(put);
+ } finally {
+ if (table != null) {
+ table.close();
+ }
+ }
+ }
+
+ /**
+ * Deletes backup status from hbase:backup table
+ * @param backupId backup id
+ * @throws IOException exception
+ */
+
+ public void deleteBackupStatus(String backupId) throws IOException {
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("delete backup status in hbase:backup for " + backupId);
+ }
+ Table table = null;
+ try {
+ table = connection.getTable(tableName);
+ Delete del = BackupSystemTableHelper.createDeletForBackupContext(backupId);
+ table.delete(del);
+ } finally {
+ if (table != null) {
+ table.close();
+ }
+ }
+ }
+
+ /**
+ * Reads backup status object (instance of BackupContext) from hbase:backup table
+ * @param backupId - backupId
+ * @return Current status of backup session or null
+ */
+
+ public BackupContext readBackupStatus(String backupId) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("read backup status from hbase:backup for: " + backupId);
+ }
+
+ Table table = null;
+ try {
+ table = connection.getTable(tableName);
+ Get get = BackupSystemTableHelper.createGetForBackupContext(backupId);
+ Result res = table.get(get);
+ if(res.isEmpty()){
+ return null;
+ }
+ return BackupSystemTableHelper.resultToBackupContext(res);
+ } finally {
+ if (table != null) {
+ table.close();
+ }
+ }
+ }
+
+ /**
+ * Read the last backup start code (timestamp) of last successful backup. Will return null if
+ * there is no start code stored on hbase or the value is of length 0. These two cases indicate
+ * there is no successful backup completed so far.
+ * @return the timestamp of last successful backup
+ * @throws IOException exception
+ */
+ public String readBackupStartCode() throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("read backup start code from hbase:backup");
+ }
+ Table table = null;
+ try {
+ table = connection.getTable(tableName);
+ Get get = BackupSystemTableHelper.createGetForStartCode();
+ Result res = table.get(get);
+ if (res.isEmpty()){
+ return null;
+ }
+ Cell cell = res.listCells().get(0);
+ byte[] val = CellUtil.cloneValue(cell);
+ if (val.length == 0){
+ return null;
+ }
+ return new String(val);
+ } finally {
+ if (table != null) {
+ table.close();
+ }
+ }
+ }
+
+ /**
+ * Write the start code (timestamp) to hbase:backup. If passed in null, then write 0 byte.
+ * @param startCode start code
+ * @throws IOException exception
+ */
+ public void writeBackupStartCode(String startCode) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("write backup start code to hbase:backup " + startCode);
+ }
+ Table table = null;
+ try {
+ table = connection.getTable(tableName);
+ Put put = BackupSystemTableHelper.createPutForStartCode(startCode);
+ table.put(put);
+ } finally {
+ if (table != null) {
+ table.close();
+ }
+ }
+ }
+
+ /**
+ * Get the Region Servers log information after the last log roll from hbase:backup.
+ * @return RS log info
+ * @throws IOException exception
+ */
+ public HashMap<String, String> readRegionServerLastLogRollResult()
+ throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("read region server last roll log result to hbase:backup");
+ }
+ Table table = null;
+ ResultScanner scanner = null;
+
+ try {
+ table = connection.getTable(tableName);
+ Scan scan = BackupSystemTableHelper.createScanForReadRegionServerLastLogRollResult();
+ scan.setMaxVersions(1);
+ scanner = table.getScanner(scan);
+ Result res = null;
+ HashMap<String, String> rsTimestampMap = new HashMap<String, String>();
+ while ((res = scanner.next()) != null) {
+ res.advance();
+ Cell cell = res.current();
+ byte[] row = CellUtil.cloneRow(cell);
+ String server =
+ BackupSystemTableHelper.getServerNameForReadRegionServerLastLogRollResult(row);
+
+ byte[] data = CellUtil.cloneValue(cell);
+ rsTimestampMap.put(server, new String(data));
+ }
+ return rsTimestampMap;
+ } finally {
+ if (scanner != null) {
+ scanner.close();
+ }
+ if (table != null) {
+ table.close();
+ }
+ }
+ }
+
+ /**
+ * Writes Region Server last roll log result (timestamp) to hbase:backup table
+ * @param server - Region Server name
+ * @param fileName - last log timestamp
+ * @throws IOException exception
+ */
+ public void writeRegionServerLastLogRollResult(String server, String fileName)
+ throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("write region server last roll log result to hbase:backup");
+ }
+ Table table = null;
+ try {
+ table = connection.getTable(tableName);
+ Put put =
+ BackupSystemTableHelper.createPutForRegionServerLastLogRollResult(server, fileName);
+ table.put(put);
+ } finally {
+ if (table != null) {
+ table.close();
+ }
+ }
+ }
+
+ /**
+ * Get all completed backup information (in desc order by time)
+ * @return history info of BackupCompleteData
+ * @throws IOException exception
+ */
+ public ArrayList<BackupCompleteData> getBackupHistory() throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("get backup history from hbase:backup");
+ }
+ Table table = null;
+ ResultScanner scanner = null;
+ ArrayList<BackupCompleteData> list = new ArrayList<BackupCompleteData>();
+ try {
+ table = connection.getTable(tableName);
+ Scan scan = BackupSystemTableHelper.createScanForBackupHistory();
+ scan.setMaxVersions(1);
+ scanner = table.getScanner(scan);
+ Result res = null;
+ while ((res = scanner.next()) != null) {
+ res.advance();
+ BackupContext context = BackupSystemTableHelper.cellToBackupContext(res.current());
+ if (context.getFlag() != BACKUPSTATUS.COMPLETE) {
+ continue;
+ }
+
+ BackupCompleteData history = new BackupCompleteData();
+ history.setBackupToken(context.getBackupId());
+ history.setStartTime(Long.toString(context.getStartTs()));
+ history.setEndTime(Long.toString(context.getEndTs()));
+ history.setBackupRootPath(context.getTargetRootDir());
+ history.setTableList(context.getTableListAsString());
+ history.setType(context.getType());
+ history.setBytesCopied(Long.toString(context.getTotalBytesCopied()));
+
+ if (context.fromExistingSnapshot()) {
+ history.markFromExistingSnapshot();
+ }
+ list.add(history);
+ }
+ return BackupUtil.sortHistoryListDesc(list);
+ } finally {
+ if (scanner != null) {
+ scanner.close();
+ }
+ if (table != null) {
+ table.close();
+ }
+ }
+ }
+
+ /**
+ * Get all backup session with a given status (in desc order by time)
+ * @param status status
+ * @return history info of backup contexts
+ * @throws IOException exception
+ */
+ public ArrayList<BackupContext> getBackupContexts(BACKUPSTATUS status) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("get backup contexts from hbase:backup");
+ }
+ Table table = null;
+ ResultScanner scanner = null;
+ ArrayList<BackupContext> list = new ArrayList<BackupContext>();
+ try {
+ table = connection.getTable(tableName);
+ Scan scan = BackupSystemTableHelper.createScanForBackupHistory();
+ scan.setMaxVersions(1);
+ scanner = table.getScanner(scan);
+ Result res = null;
+ while ((res = scanner.next()) != null) {
+ res.advance();
+ BackupContext context = BackupSystemTableHelper.cellToBackupContext(res.current());
+ if (context.getFlag() != status){
+ continue;
+ }
+ list.add(context);
+ }
+ return list;
+ } finally {
+ if (scanner != null) {
+ scanner.close();
+ }
+ if (table != null) {
+ table.close();
+ }
+ }
+ }
+
+ /**
+ * Write the current timestamps for each regionserver to hbase:backup after a successful full or
+ * incremental backup. The saved timestamp is of the last log file that was backed up already.
+ * @param tables tables
+ * @param newTimestamps timestamps
+ * @throws IOException exception
+ */
+ public void writeRegionServerLogTimestamp(Set<String> tables,
+ HashMap<String, String> newTimestamps) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("write RS log ts to HBASE_BACKUP");
+ }
+ StringBuilder sb = new StringBuilder();
+ for (Map.Entry<String, String> entry : newTimestamps.entrySet()) {
+ String host = entry.getKey();
+ String timestamp = entry.getValue();
+ sb.append(host).append(BackupUtil.FIELD_SEPARATOR).append(timestamp)
+ .append(BackupUtil.RECORD_SEPARATOR);
+ }
+ String smap = sb.toString();
+ List<Put> puts = new ArrayList<Put>();
+ for (String table : tables) {
+ Put put = BackupSystemTableHelper.createPutForWriteRegionServerLogTimestamp(table, smap);
+ puts.add(put);
+ }
+ Table table = null;
+ try {
+ table = connection.getTable(tableName);
+ table.put(puts);
+ } finally {
+ if (table != null) {
+ table.close();
+ }
+ }
+ }
+
+ /**
+ * Read the timestamp for each region server log after the last successful backup. Each table has
+ * its own set of the timestamps. The info is stored for each table as a concatenated string of
+ * rs->timestapmp
+ * @return the timestamp for each region server. key: tableName value:
+ * RegionServer,PreviousTimeStamp
+ * @throws IOException exception
+ */
+ public HashMap<String, HashMap<String, String>> readLogTimestampMap() throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("read RS log ts from HBASE_BACKUP");
+ }
+
+ Table table = null;
+ ResultScanner scanner = null;
+ HashMap<String, HashMap<String, String>> tableTimestampMap =
+ new HashMap<String, HashMap<String, String>>();
+
+ try {
+ table = connection.getTable(tableName);
+ Scan scan = BackupSystemTableHelper.createScanForReadLogTimestampMap();
+ scanner = table.getScanner(scan);
+ Result res = null;
+ while ((res = scanner.next()) != null) {
+ res.advance();
+ Cell cell = res.current();
+ byte[] row = CellUtil.cloneRow(cell);
+ String tabName = BackupSystemTableHelper.getTableNameForReadLogTimestampMap(row);
+ HashMap<String, String> lastBackup = new HashMap<String, String>();
+ byte[] data = CellUtil.cloneValue(cell);
+ if (data == null) {
+ // TODO
+ throw new IOException("Data of last backup data from HBASE_BACKUP "
+ + "is empty. Create a backup first.");
+ }
+ if (data != null && data.length > 0) {
+ String s = new String(data);
+ String[] records = s.split(BackupUtil.RECORD_SEPARATOR);
+ for (String record : records) {
+ String[] flds = record.split(BackupUtil.FIELD_SEPARATOR);
+ if (flds.length != 2) {
+ throw new IOException("data from HBASE_BACKUP is corrupted: "
+ + Arrays.toString(flds));
+ }
+ lastBackup.put(flds[0], flds[1]);
+ }
+ tableTimestampMap.put(tabName, lastBackup);
+ }
+ }
+ return tableTimestampMap;
+ } finally {
+ if (scanner != null) {
+ scanner.close();
+ }
+ if (table != null) {
+ table.close();
+ }
+ }
+ }
+
+ /**
+ * Return the current tables covered by incremental backup.
+ * @return set of tableNames
+ * @throws IOException exception
+ */
+ public Set<String> getIncrementalBackupTableSet() throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("get incr backup table set from hbase:backup");
+ }
+ Table table = null;
+ TreeSet<String> set = new TreeSet<String>();
+
+ try {
+ table = connection.getTable(tableName);
+ Get get = BackupSystemTableHelper.createGetForIncrBackupTableSet();
+ Result res = table.get(get);
+ if (res.isEmpty()) {
+ return set;
+ }
+ List<Cell> cells = res.listCells();
+ for (Cell cell : cells) {
+ // qualifier = table name - we use table names as qualifiers
+ // TODO ns:table as qualifier?
+ set.add(new String(CellUtil.cloneQualifier(cell)));
+ }
+ return set;
+ } finally {
+ if (table != null) {
+ table.close();
+ }
+ }
+ }
+
+ /**
+ * Add tables to global incremental backup set
+ * @param tables - set of tables
+ * @throws IOException exception
+ */
+ public void addIncrementalBackupTableSet(Set<String> tables) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("add incr backup table set to hbase:backup");
+ }
+ Table table = null;
+ try {
+ table = connection.getTable(tableName);
+ Put put = BackupSystemTableHelper.createPutForIncrBackupTableSet(tables);
+ table.put(put);
+ } finally {
+ if (table != null) {
+ table.close();
+ }
+ }
+ }
+
+ /**
+ * Register WAL files as eligible for deletion
+ * @param files files
+ * @throws IOException exception
+ */
+ public void addWALFiles(List<String> files, String backupId) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("add WAL files to hbase:backup");
+ }
+ Table table = null;
+ try {
+ table = connection.getTable(tableName);
+ List<Put> puts = BackupSystemTableHelper.createPutsForAddWALFiles(files, backupId);
+ table.put(puts);
+ } finally {
+ if (table != null) {
+ table.close();
+ }
+ }
+ }
+
+ /**
+ * Check if WAL file is eligible for deletion
+ * @param file file
+ * @return true, if - yes.
+ * @throws IOException exception
+ */
+ public boolean checkWALFile(String file) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Check if WAL file has been already backuped in hbase:backup");
+ }
+ Table table = null;
+ try {
+ table = connection.getTable(tableName);
+ Get get = BackupSystemTableHelper.createGetForCheckWALFile(file);
+ Result res = table.get(get);
+ if (res.isEmpty()){
+ return false;
+ }
+ return true;
+ } finally {
+ if (table != null) {
+ table.close();
+ }
+ }
+ }
+
+ /**
+ * Checks if we have at least one backup session in hbase:backup This API is used by
+ * BackupLogCleaner
+ * @return true, if - at least one session exists in hbase:backup table
+ * @throws IOException exception
+ */
+ public boolean hasBackupSessions() throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("has backup sessions from hbase:backup");
+ }
+ Table table = null;
+ ResultScanner scanner = null;
+ boolean result = false;
+ try {
+ table = connection.getTable(tableName);
+ Scan scan = BackupSystemTableHelper.createScanForBackupHistory();
+ scan.setMaxVersions(1);
+ scan.setCaching(1);
+ scanner = table.getScanner(scan);
+ if (scanner.next() != null) {
+ result = true;
+ }
+ return result;
+ } finally {
+ if (scanner != null) {
+ scanner.close();
+ }
+ if (table != null) {
+ table.close();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/de69f0df/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupSystemTableHelper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupSystemTableHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupSystemTableHelper.java
new file mode 100644
index 0000000..bf62a84
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupSystemTableHelper.java
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+
+
+/**
+ * A collection for methods used by BackupSystemTable.
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public final class BackupSystemTableHelper {
+
+ /**
+ * hbase:backup schema:
+ * 1. Backup sessions rowkey= "session." + backupId; value = serialized
+ * BackupContext
+ * 2. Backup start code rowkey = "startcode"; value = startcode
+ * 3. Incremental backup set rowkey="incrbackupset"; value=[list of tables]
+ * 4. Table-RS-timestamp map rowkey="trslm"+ table_name; value = map[RS-> last WAL timestamp]
+ * 5. RS - WAL ts map rowkey="rslogts."+server; value = last WAL timestamp
+ * 6. WALs recorded rowkey="wals."+WAL unique file name; value = NULL (value is not used)
+ */
+ private static final Log LOG = LogFactory.getLog(BackupSystemTableHelper.class);
+
+ private final static String BACKUP_CONTEXT_PREFIX = "session.";
+ private final static String START_CODE_ROW = "startcode";
+ private final static String INCR_BACKUP_SET = "incrbackupset";
+ private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm.";
+ private final static String RS_LOG_TS_PREFIX = "rslogts.";
+ private final static String WALS_PREFIX = "wals.";
+
+ private final static byte[] q0 = "0".getBytes();
+ private final static byte[] EMPTY_VALUE = new byte[] {};
+
+ private BackupSystemTableHelper() {
+ throw new AssertionError("Instantiating utility class...");
+ }
+
+ /**
+ * Creates Put operation for a given backup context object
+ * @param context backup context
+ * @return put operation
+ * @throws IOException exception
+ */
+ static Put createPutForBackupContext(BackupContext context) throws IOException {
+
+ Put put = new Put((BACKUP_CONTEXT_PREFIX + context.getBackupId()).getBytes());
+ put.addColumn(BackupSystemTable.familyName, q0, context.toByteArray());
+ return put;
+ }
+
+ /**
+ * Creates Get operation for a given backup id
+ * @param backupId - backup's ID
+ * @return get operation
+ * @throws IOException exception
+ */
+ static Get createGetForBackupContext(String backupId) throws IOException {
+ Get get = new Get((BACKUP_CONTEXT_PREFIX + backupId).getBytes());
+ get.addFamily(BackupSystemTable.familyName);
+ get.setMaxVersions(1);
+ return get;
+ }
+
+ /**
+ * Creates Delete operation for a given backup id
+ * @param backupId - backup's ID
+ * @return delete operation
+ * @throws IOException exception
+ */
+ public static Delete createDeletForBackupContext(String backupId) {
+ Delete del = new Delete((BACKUP_CONTEXT_PREFIX + backupId).getBytes());
+ del.addFamily(BackupSystemTable.familyName);
+ return del;
+ }
+
+ /**
+ * Converts Result to BackupContext
+ * @param res - HBase result
+ * @return backup context instance
+ * @throws IOException exception
+ */
+ static BackupContext resultToBackupContext(Result res) throws IOException {
+ res.advance();
+ Cell cell = res.current();
+ return cellToBackupContext(cell);
+ }
+
+ /**
+ * Creates Get operation to retrieve start code from hbase:backup
+ * @return get operation
+ * @throws IOException exception
+ */
+ static Get createGetForStartCode() throws IOException {
+ Get get = new Get(START_CODE_ROW.getBytes());
+ get.addFamily(BackupSystemTable.familyName);
+ get.setMaxVersions(1);
+ return get;
+ }
+
+ /**
+ * Creates Put operation to store start code to hbase:backup
+ * @return put operation
+ * @throws IOException exception
+ */
+ static Put createPutForStartCode(String startCode) {
+ Put put = new Put(START_CODE_ROW.getBytes());
+ put.addColumn(BackupSystemTable.familyName, q0, startCode.getBytes());
+ return put;
+ }
+
+ /**
+ * Creates Get to retrieve incremental backup table set from hbase:backup
+ * @return get operation
+ * @throws IOException exception
+ */
+ static Get createGetForIncrBackupTableSet() throws IOException {
+ Get get = new Get(INCR_BACKUP_SET.getBytes());
+ get.addFamily(BackupSystemTable.familyName);
+ get.setMaxVersions(1);
+ return get;
+ }
+
+ /**
+ * Creates Put to store incremental backup table set
+ * @param tables tables
+ * @return put operation
+ */
+ static Put createPutForIncrBackupTableSet(Set<String> tables) {
+ Put put = new Put(INCR_BACKUP_SET.getBytes());
+ for (String table : tables) {
+ put.addColumn(BackupSystemTable.familyName, table.getBytes(), EMPTY_VALUE);
+ }
+ return put;
+ }
+
+ /**
+ * Creates Scan operation to load backup history
+ * @return scan operation
+ */
+ static Scan createScanForBackupHistory() {
+ Scan scan = new Scan();
+ byte[] startRow = BACKUP_CONTEXT_PREFIX.getBytes();
+ byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
+ stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
+ scan.setStartRow(startRow);
+ scan.setStopRow(stopRow);
+ scan.addFamily(BackupSystemTable.familyName);
+
+ return scan;
+ }
+
+ /**
+ * Converts cell to backup context instance.
+ * @param current - cell
+ * @return backup context instance
+ * @throws IOException exception
+ */
+ static BackupContext cellToBackupContext(Cell current) throws IOException {
+ byte[] data = CellUtil.cloneValue(current);
+ try {
+ BackupContext ctxt = BackupContext.fromByteArray(data);
+ return ctxt;
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Creates Put to write RS last roll log timestamp map
+ * @param table - table
+ * @param smap - map, containing RS:ts
+ * @return put operation
+ */
+ static Put createPutForWriteRegionServerLogTimestamp(String table, String smap) {
+ Put put = new Put((TABLE_RS_LOG_MAP_PREFIX + table).getBytes());
+ put.addColumn(BackupSystemTable.familyName, q0, smap.getBytes());
+ return put;
+ }
+
+ /**
+ * Creates Scan to load table-> { RS -> ts} map of maps
+ * @return scan operation
+ */
+ static Scan createScanForReadLogTimestampMap() {
+ Scan scan = new Scan();
+ byte[] startRow = TABLE_RS_LOG_MAP_PREFIX.getBytes();
+ byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
+ stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
+ scan.setStartRow(startRow);
+ scan.setStopRow(stopRow);
+ scan.addFamily(BackupSystemTable.familyName);
+
+ return scan;
+ }
+
+ /**
+ * Get table name from rowkey
+ * @param cloneRow rowkey
+ * @return table name
+ */
+ static String getTableNameForReadLogTimestampMap(byte[] cloneRow) {
+ int prefixSize = TABLE_RS_LOG_MAP_PREFIX.length();
+ return new String(cloneRow, prefixSize, cloneRow.length - prefixSize);
+ }
+
+ /**
+ * Creates Put to store RS last log result
+ * @param server - server name
+ * @param fileName - log roll result (timestamp)
+ * @return put operation
+ */
+ static Put createPutForRegionServerLastLogRollResult(String server, String fileName) {
+ Put put = new Put((RS_LOG_TS_PREFIX + server).getBytes());
+ put.addColumn(BackupSystemTable.familyName, q0, fileName.getBytes());
+ return put;
+ }
+
+ /**
+ * Creates Scan operation to load last RS log roll results
+ * @return scan operation
+ */
+ static Scan createScanForReadRegionServerLastLogRollResult() {
+ Scan scan = new Scan();
+ byte[] startRow = RS_LOG_TS_PREFIX.getBytes();
+ byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
+ stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
+ scan.setStartRow(startRow);
+ scan.setStopRow(stopRow);
+ scan.addFamily(BackupSystemTable.familyName);
+
+ return scan;
+ }
+
+ /**
+ * Get server's name from rowkey
+ * @param row - rowkey
+ * @return server's name
+ */
+ static String getServerNameForReadRegionServerLastLogRollResult(byte[] row) {
+ int prefixSize = RS_LOG_TS_PREFIX.length();
+ return new String(row, prefixSize, row.length - prefixSize);
+ }
+
+ /**
+ * Creates put list for list of WAL files
+ * @param files list of WAL file paths
+ * @param backupId backup id
+ * @return put list
+ * @throws IOException exception
+ */
+ public static List<Put> createPutsForAddWALFiles(List<String> files, String backupId)
+ throws IOException {
+
+ List<Put> puts = new ArrayList<Put>();
+ for (String file : files) {
+ LOG.debug("+++ put: " + BackupUtil.getUniqueWALFileNamePart(file));
+ byte[] row = (WALS_PREFIX + BackupUtil.getUniqueWALFileNamePart(file)).getBytes();
+ Put put = new Put(row);
+ put.addColumn(BackupSystemTable.familyName, q0, backupId.getBytes());
+ puts.add(put);
+ }
+ return puts;
+ }
+
+ /**
+ * Creates Get operation for a given wal file name
+ * @param file file
+ * @return get operation
+ * @throws IOException exception
+ */
+ public static Get createGetForCheckWALFile(String file) throws IOException {
+ byte[] row = (WALS_PREFIX + BackupUtil.getUniqueWALFileNamePart(file)).getBytes();
+ Get get = new Get(row);
+ get.addFamily(BackupSystemTable.familyName);
+ get.setMaxVersions(1);
+ return get;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/de69f0df/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupUtil.java
new file mode 100644
index 0000000..ff8bd2e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupUtil.java
@@ -0,0 +1,564 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.FSTableDescriptors;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+
+/**
+ * A collection for methods used by multiple classes to backup HBase tables.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public final class BackupUtil {
+ protected static final Log LOG = LogFactory.getLog(BackupUtil.class);
+
+ public static final String FIELD_SEPARATOR = "\001";
+ public static final String RECORD_SEPARATOR = "\002";
+ public static final String LOGNAME_SEPARATOR = ".";
+ protected static final String HDFS = "hdfs://";
+ protected static Configuration conf = null;
+
+ private BackupUtil(){
+ throw new AssertionError("Instantiating utility class...");
+ }
+
+ /**
+ * Set the configuration from a given one.
+ * @param newConf A new given configuration
+ */
+ public synchronized static void setConf(Configuration newConf) {
+ conf = newConf;
+ }
+
+ /**
+ * Get and merge Hadoop and HBase configuration.
+ * @throws IOException exception
+ */
+ protected static Configuration getConf() {
+ if (conf == null) {
+ conf = new Configuration();
+ HBaseConfiguration.merge(conf, HBaseConfiguration.create());
+ }
+ return conf;
+ }
+
+ /**
+ * Loop through the RS log timestamp map for the tables, for each RS, find the min timestamp value
+ * for the RS among the tables.
+ * @param rsLogTimestampMap timestamp map
+ * @return the min timestamp of each RS
+ */
+ protected static HashMap<String, String> getRSLogTimestampMins(
+ HashMap<String, HashMap<String, String>> rsLogTimestampMap) {
+
+ if (rsLogTimestampMap == null || rsLogTimestampMap.isEmpty()) {
+ return null;
+ }
+
+ HashMap<String, String> rsLogTimestamptMins = new HashMap<String, String>();
+ HashMap<String, HashMap<String, String>> rsLogTimestampMapByRS =
+ new HashMap<String, HashMap<String, String>>();
+
+ for (Entry<String, HashMap<String, String>> tableEntry : rsLogTimestampMap.entrySet()) {
+ String table = tableEntry.getKey();
+ HashMap<String, String> rsLogTimestamp = tableEntry.getValue();
+ for (Entry<String, String> rsEntry : rsLogTimestamp.entrySet()) {
+ String rs = rsEntry.getKey();
+ String ts = rsEntry.getValue();
+ if (!rsLogTimestampMapByRS.containsKey(rs)) {
+ rsLogTimestampMapByRS.put(rs, new HashMap<String, String>());
+ rsLogTimestampMapByRS.get(rs).put(table, ts);
+ } else {
+ rsLogTimestampMapByRS.get(rs).put(table, ts);
+ }
+ }
+ }
+
+ for (String rs : rsLogTimestampMapByRS.keySet()) {
+ rsLogTimestamptMins.put(rs, getMinValue(rsLogTimestampMapByRS.get(rs)));
+ }
+
+ return rsLogTimestamptMins;
+ }
+
+ /**
+ * Get the min value for all the Values a map.
+ * @param map map
+ * @return the min value
+ */
+ protected static String getMinValue(HashMap<String, String> map) {
+ String minTimestamp = null;
+ if (map != null) {
+ ArrayList<String> timestampList = new ArrayList<String>(map.values());
+ Collections.sort(timestampList, new Comparator<String>() {
+ @Override
+ public int compare(String s1, String s2) {
+ long l1 = Long.valueOf(s1);
+ long l2 = Long.valueOf(s2);
+ if (l1 > l2) {
+ return 1;
+ } else if (l1 < l2) {
+ return -1;
+ } else {
+ return 0;
+ }
+ }
+ });
+ // The min among all the RS log timestamps will be kept in ZK.
+ minTimestamp = timestampList.get(0);
+ }
+ return minTimestamp;
+ }
+
+ /**
+ * copy out Table RegionInfo into incremental backup image need to consider move this logic into
+ * HBackupFileSystem
+ * @param backupContext backup context
+ * @param conf configuration
+ * @throws IOException exception
+ * @throws InterruptedException exception
+ */
+ protected static void copyTableRegionInfo(BackupContext backupContext, Configuration conf)
+ throws IOException, InterruptedException {
+
+ Path rootDir = FSUtils.getRootDir(conf);
+ FileSystem fs = rootDir.getFileSystem(conf);
+
+ // for each table in the table set, copy out the table info and region info files in the correct
+ // directory structure
+ for (String table : backupContext.getTables()) {
+
+ LOG.debug("Attempting to copy table info for:" + table);
+ TableDescriptor orig =
+ FSTableDescriptors.getTableDescriptorFromFs(fs, rootDir, TableName.valueOf(table));
+
+ // write a copy of descriptor to the target directory
+ Path target = new Path(backupContext.getBackupStatus(table).getTargetDir());
+ FileSystem targetFs = target.getFileSystem(conf);
+ FSTableDescriptors descriptors =
+ new FSTableDescriptors(conf, targetFs, FSUtils.getRootDir(conf));
+ descriptors.createTableDescriptorForTableDirectory(target, orig, false);
+ LOG.debug("Finished copying tableinfo.");
+
+ HBaseAdmin hbadmin = null;
+ // TODO: optimize
+ Connection conn = null;
+ List<HRegionInfo> regions = null;
+ try {
+ conn = ConnectionFactory.createConnection(conf);
+ hbadmin = (HBaseAdmin) conn.getAdmin();
+ regions = hbadmin.getTableRegions(TableName.valueOf(table));
+ } catch (Exception e) {
+ throw new BackupException(e);
+ } finally {
+ if (hbadmin != null) {
+ hbadmin.close();
+ }
+ if(conn != null){
+ conn.close();
+ }
+ }
+
+ // For each region, write the region info to disk
+ LOG.debug("Starting to write region info for table " + table);
+ for (HRegionInfo regionInfo : regions) {
+ Path regionDir =
+ HRegion.getRegionDir(new Path(backupContext.getBackupStatus(table).getTargetDir()),
+ regionInfo);
+ regionDir =
+ new Path(backupContext.getBackupStatus(table).getTargetDir(), regionDir.getName());
+ writeRegioninfoOnFilesystem(conf, targetFs, regionDir, regionInfo);
+ }
+ LOG.debug("Finished writing region info for table " + table);
+ }
+ }
+
+ /**
+ * Write the .regioninfo file on-disk.
+ */
+ public static void writeRegioninfoOnFilesystem(final Configuration conf, final FileSystem fs,
+ final Path regionInfoDir, HRegionInfo regionInfo) throws IOException {
+ final byte[] content = regionInfo.toDelimitedByteArray();
+ Path regionInfoFile = new Path(regionInfoDir, ".regioninfo");
+ // First check to get the permissions
+ FsPermission perms = FSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY);
+ // Write the RegionInfo file content
+ FSDataOutputStream out = FSUtils.create(conf, fs, regionInfoFile, perms, null);
+ try {
+ out.write(content);
+ } finally {
+ out.close();
+ }
+ }
+
+ /**
+ * TODO: verify the code
+ * @param p path
+ * @return host name
+ * @throws IOException exception
+ */
+ protected static String parseHostFromOldLog(Path p) throws IOException {
+ String n = p.getName();
+ int idx = n.lastIndexOf(LOGNAME_SEPARATOR);
+ String s = URLDecoder.decode(n.substring(0, idx), "UTF8");
+ return ServerName.parseHostname(s);
+ }
+
+ public static String parseHostNameFromLogFile(Path p) throws IOException {
+ if (isArchivedLogFile(p)) {
+ return parseHostFromOldLog(p);
+ } else {
+ return DefaultWALProvider.getServerNameFromWALDirectoryName(p).getHostname();
+ }
+ }
+
+ private static boolean isArchivedLogFile(Path p) {
+ String oldLog = Path.SEPARATOR + HConstants.HREGION_OLDLOGDIR_NAME + Path.SEPARATOR;
+ return p.toString().contains(oldLog);
+ }
+
+ /**
+ * Return WAL file name
+ * @param walFileName WAL file name
+ * @return WAL file name
+ * @throws IOException exception
+ * @throws IllegalArgumentException exception
+ */
+ public static String getUniqueWALFileNamePart(String walFileName) throws IOException {
+ return new Path(walFileName).getName();
+ }
+
+ /**
+ * Return WAL file name
+ * @param p - WAL file path
+ * @return WAL file name
+ * @throws IOException exception
+ */
+ public static String getUniqueWALFileNamePart(Path p) throws IOException {
+ return p.getName();
+ }
+
+ /**
+ * Given the log file, parse the timestamp from the file name. The timestamp is the last number.
+ * @param p a path to the log file
+ * @return the timestamp
+ * @throws IOException exception
+ */
+ protected static String getCreationTime(Path p, Configuration conf) throws IOException {
+ int idx = p.getName().lastIndexOf(LOGNAME_SEPARATOR);
+ if (idx < 0) {
+ throw new IOException("Cannot parse timestamp from path " + p);
+ }
+ String ts = p.getName().substring(idx + 1);
+ return ts;
+ }
+
+ /**
+ * Get the total length of files under the given directory recursively.
+ * @param fs The hadoop file system
+ * @param dir The target directory
+ * @return the total length of files
+ * @throws IOException exception
+ */
+ public static long getFilesLength(FileSystem fs, Path dir) throws IOException {
+ long totalLength = 0;
+ FileStatus[] files = FSUtils.listStatus(fs, dir);
+ if (files != null) {
+ for (FileStatus fileStatus : files) {
+ if (fileStatus.isDir()) {
+ totalLength += getFilesLength(fs, fileStatus.getPath());
+ } else {
+ totalLength += fileStatus.getLen();
+ }
+ }
+ }
+ return totalLength;
+ }
+
+ /**
+ * Keep the record for dependency for incremental backup and history info p.s, we may be able to
+ * merge this class into backupImage class later
+ */
+ public static class BackupCompleteData implements Comparable<BackupCompleteData> {
+ private String startTime;
+ private String endTime;
+ private String type;
+ private String backupRootPath;
+ private String tableList;
+ private String backupToken;
+ private String bytesCopied;
+ private List<String> ancestors;
+ private boolean fromExistingSnapshot = false;
+
+ public List<String> getAncestors() {
+ if (fromExistingSnapshot) {
+ return null;
+ }
+ if (this.ancestors == null) {
+ this.ancestors = new ArrayList<String>();
+ }
+ return this.ancestors;
+ }
+
+ public void addAncestor(String backupToken) {
+ this.getAncestors().add(backupToken);
+ }
+
+ public String getBytesCopied() {
+ return bytesCopied;
+ }
+
+ public void setBytesCopied(String bytesCopied) {
+ this.bytesCopied = bytesCopied;
+ }
+
+ public String getBackupToken() {
+ return backupToken;
+ }
+
+ public void setBackupToken(String backupToken) {
+ this.backupToken = backupToken;
+ }
+
+ public String getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(String startTime) {
+ this.startTime = startTime;
+ }
+
+ public String getEndTime() {
+ return endTime;
+ }
+
+ public void setEndTime(String endTime) {
+ this.endTime = endTime;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public String getBackupRootPath() {
+ return backupRootPath;
+ }
+
+ public void setBackupRootPath(String backupRootPath) {
+ this.backupRootPath = backupRootPath;
+ }
+
+ public String getTableList() {
+ return tableList;
+ }
+
+ public void setTableList(String tableList) {
+ this.tableList = tableList;
+ }
+
+ public boolean fromExistingSnapshot() {
+ return this.fromExistingSnapshot;
+ }
+
+ public void markFromExistingSnapshot() {
+ this.fromExistingSnapshot = true;
+ }
+
+ @Override
+ public int compareTo(BackupCompleteData o) {
+ Long thisTS =
+ new Long(this.getBackupToken().substring(this.getBackupToken().lastIndexOf("_") + 1));
+ Long otherTS =
+ new Long(o.getBackupToken().substring(o.getBackupToken().lastIndexOf("_") + 1));
+ return thisTS.compareTo(otherTS);
+ }
+
+ }
+
+ /**
+ * Sort history list by start time in descending order.
+ * @param historyList history list
+ * @return sorted list of BackupCompleteData
+ */
+ public static ArrayList<BackupCompleteData> sortHistoryListDesc(
+ ArrayList<BackupCompleteData> historyList) {
+ ArrayList<BackupCompleteData> list = new ArrayList<BackupCompleteData>();
+ TreeMap<String, BackupCompleteData> map = new TreeMap<String, BackupCompleteData>();
+ for (BackupCompleteData h : historyList) {
+ map.put(h.getStartTime(), h);
+ }
+ Iterator<String> i = map.descendingKeySet().iterator();
+ while (i.hasNext()) {
+ list.add(map.get(i.next()));
+ }
+ return list;
+ }
+
+ /**
+ * Get list of all WAL files (WALs and archive)
+ * @param c - configuration
+ * @return list of WAL files
+ * @throws IOException exception
+ */
+ public static List<String> getListOfWALFiles(Configuration c) throws IOException {
+ Path rootDir = FSUtils.getRootDir(c);
+ Path logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
+ Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+ List<String> logFiles = new ArrayList<String>();
+
+ FileSystem fs = FileSystem.get(c);
+ logFiles = getFiles(fs, logDir, logFiles, null);
+ logFiles = getFiles(fs, oldLogDir, logFiles, null);
+ return logFiles;
+ }
+
+ /**
+ * Get list of all WAL files (WALs and archive)
+ * @param c - configuration
+ * @return list of WAL files
+ * @throws IOException exception
+ */
+ public static List<String> getListOfWALFiles(Configuration c, PathFilter filter)
+ throws IOException {
+ Path rootDir = FSUtils.getRootDir(c);
+ Path logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
+ Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+ List<String> logFiles = new ArrayList<String>();
+
+ FileSystem fs = FileSystem.get(c);
+ logFiles = getFiles(fs, logDir, logFiles, filter);
+ logFiles = getFiles(fs, oldLogDir, logFiles, filter);
+ return logFiles;
+ }
+
+ /**
+ * Get list of all old WAL files (WALs and archive)
+ * @param c - configuration
+ * @return list of WAL files
+ * @throws IOException exception
+ */
+ public static List<String> getWALFilesOlderThan(final Configuration c,
+ final HashMap<String, String> hostTimestampMap) throws IOException {
+ Path rootDir = FSUtils.getRootDir(c);
+ Path logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
+ Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+ List<String> logFiles = new ArrayList<String>();
+
+ PathFilter filter = new PathFilter() {
+
+ @Override
+ public boolean accept(Path p) {
+ try {
+ if (DefaultWALProvider.isMetaFile(p)) {
+ return false;
+ }
+ String host = BackupUtil.parseHostNameFromLogFile(p);
+ String oldTimestamp = hostTimestampMap.get(host);
+ String currentLogTS = getCreationTime(p, c);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("path=" + p);
+ LOG.debug("oldTimestamp=" + oldTimestamp);
+ LOG.debug("currentLogTS=" + currentLogTS);
+ }
+ return Long.parseLong(currentLogTS) <= Long.parseLong(oldTimestamp);
+ } catch (IOException e) {
+ LOG.error(e);
+ return false;
+ }
+ }
+ };
+ FileSystem fs = FileSystem.get(c);
+ logFiles = getFiles(fs, logDir, logFiles, filter);
+ logFiles = getFiles(fs, oldLogDir, logFiles, filter);
+ return logFiles;
+ }
+
+ private static List<String> getFiles(FileSystem fs, Path rootDir, List<String> files,
+ PathFilter filter) throws FileNotFoundException, IOException {
+
+ RemoteIterator<LocatedFileStatus> it = fs.listFiles(rootDir, true);
+
+ while (it.hasNext()) {
+ LocatedFileStatus lfs = it.next();
+ if (lfs.isDirectory()) {
+ continue;
+ }
+ // apply filter
+ if (filter.accept(lfs.getPath())) {
+ files.add(lfs.getPath().toString());
+ LOG.info(lfs.getPath());
+ }
+ }
+ return files;
+ }
+
+ public static String concat(Collection<String> col, String separator) {
+ if (col.size() == 0) {
+ return "";
+ }
+ StringBuilder sb = new StringBuilder();
+ for (String s : col) {
+ sb.append(s + separator);
+ }
+ sb.deleteCharAt(sb.lastIndexOf(";"));
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/de69f0df/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java
new file mode 100644
index 0000000..74411da
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java
@@ -0,0 +1,511 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
+import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * View to an on-disk Backup Image FileSytem
+ * Provides the set of methods necessary to interact with the on-disk Backup Image data.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class HBackupFileSystem {
+ public static final Log LOG = LogFactory.getLog(HBackupFileSystem.class);
+
+ private final String RESTORE_TMP_PATH = "/tmp/restoreTemp";
+ private final String[] ignoreDirs = { "recovered.edits" };
+
+ private final Configuration conf;
+ private final FileSystem fs;
+ private final Path backupRootPath;
+ private final String backupId;
+
+ /**
+ * Create a view to the on-disk Backup Image.
+ * @param conf to use
+ * @param backupPath to where the backup Image stored
+ * @param backupId represent backup Image
+ */
+ HBackupFileSystem(final Configuration conf, final Path backupRootPath, final String backupId)
+ throws IOException {
+ this.conf = conf;
+ this.fs = backupRootPath.getFileSystem(conf);
+ this.backupRootPath = backupRootPath;
+ this.backupId = backupId; // the backup ID for the lead backup Image
+ }
+
+ /**
+ * @param tableName is the table backuped
+ * @return {@link HTableDescriptor} saved in backup image of the table
+ */
+ protected HTableDescriptor getTableDesc(String tableName) throws FileNotFoundException,
+ IOException {
+
+ Path tableInfoPath = this.getTableInfoPath(tableName);
+ LOG.debug("tableInfoPath = " + tableInfoPath.toString());
+ SnapshotDescription desc = SnapshotDescriptionUtils.readSnapshotInfo(fs, tableInfoPath);
+ LOG.debug("desc = " + desc.getName());
+ SnapshotManifest manifest = SnapshotManifest.open(conf, fs, tableInfoPath, desc);
+ HTableDescriptor tableDescriptor = manifest.getTableDescriptor();
+ /*
+ * for HBase 0.96 or 0.98 HTableDescriptor tableDescriptor =
+ * FSTableDescriptors.getTableDescriptorFromFs(fs, tableInfoPath);
+ */
+ if (!tableDescriptor.getNameAsString().equals(tableName)) {
+ LOG.error("couldn't find Table Desc for table: " + tableName + " under tableInfoPath: "
+ + tableInfoPath.toString());
+ LOG.error("tableDescriptor.getNameAsString() = " + tableDescriptor.getNameAsString());
+ }
+ return tableDescriptor;
+ }
+
+ /**
+ * Given the backup root dir, backup id and the table name, return the backup image location,
+ * which is also where the backup manifest file is. return value look like:
+ * "hdfs://backup.hbase.org:9000/user/biadmin/backup1/default/t1_dn/backup_1396650096738"
+ * @param backupRootDir backup root directory
+ * @param backupId backup id
+ * @param table table name
+ * @return backupPath String for the particular table
+ */
+ protected static String getTableBackupDir(String backupRootDir, String backupId, String table) {
+ TableName tableName = TableName.valueOf(table);
+ return backupRootDir + File.separator + tableName.getNamespaceAsString() + File.separator
+ + tableName.getQualifierAsString() + File.separator + backupId;
+ }
+
+ /**
+ * Given the backup root dir, backup id and the table name, return the backup image location,
+ * which is also where the backup manifest file is. return value look like:
+ * "hdfs://backup.hbase.org:9000/user/biadmin/backup1/default/t1_dn/backup_1396650096738"
+ * @param tableN table name
+ * @return backupPath for the particular table
+ */
+ protected Path getTableBackupPath(String tableN) {
+ TableName tableName = TableName.valueOf(tableN);
+ return new Path(this.backupRootPath, tableName.getNamespaceAsString() + File.separator
+ + tableName.getQualifierAsString() + File.separator + backupId);
+ }
+
+ /**
+ * return value represent path for:
+ * ".../user/biadmin/backup1/default/t1_dn/backup_1396650096738/.hbase-snapshot"
+ * @param tableName table name
+ * @return path for snapshot
+ */
+ protected Path getTableSnapshotPath(String tableName) {
+ return new Path(this.getTableBackupPath(tableName), HConstants.SNAPSHOT_DIR_NAME);
+ }
+
+ /**
+ * return value represent path for:
+ * "..../default/t1_dn/backup_1396650096738/.hbase-snapshot/snapshot_1396650097621_default_t1_dn"
+ * this path contains .snapshotinfo, .tabledesc (0.96 and 0.98) this path contains .snapshotinfo,
+ * .data.manifest (trunk)
+ * @param tableName table name
+ * @return path to table info
+ * @throws FileNotFoundException exception
+ * @throws IOException exception
+ */
+ protected Path getTableInfoPath(String tableName) throws FileNotFoundException, IOException {
+
+ Path tableSnapShotPath = this.getTableSnapshotPath(tableName);
+ Path tableInfoPath = null;
+
+ // can't build the path directly as the timestamp values are different
+ FileStatus[] snapshots = fs.listStatus(tableSnapShotPath);
+ for (FileStatus snapshot : snapshots) {
+ tableInfoPath = snapshot.getPath();
+ // SnapshotManifest.DATA_MANIFEST_NAME = "data.manifest";
+ if (tableInfoPath.getName().endsWith("data.manifest")) {
+ LOG.debug("find Snapshot Manifest");
+ break;
+ }
+ }
+ return tableInfoPath;
+ }
+
+ /**
+ * return value represent path for:
+ * ".../user/biadmin/backup1/default/t1_dn/backup_1396650096738/archive/data/default/t1_dn"
+ * @param tabelName table name
+ * @return path to table archive
+ * @throws IOException exception
+ */
+ protected Path getTableArchivePath(String tableName) throws IOException {
+ Path baseDir = new Path(getTableBackupPath(tableName), HConstants.HFILE_ARCHIVE_DIRECTORY);
+ Path dataDir = new Path(baseDir, HConstants.BASE_NAMESPACE_DIR);
+ Path archivePath = new Path(dataDir, TableName.valueOf(tableName).getNamespaceAsString());
+ Path tableArchivePath =
+ new Path(archivePath, TableName.valueOf(tableName).getQualifierAsString());
+ if (!fs.exists(tableArchivePath) || !fs.getFileStatus(tableArchivePath).isDirectory()) {
+ LOG.debug("Folder tableArchivePath: " + tableArchivePath.toString() + " does not exists");
+ tableArchivePath = null; // empty table has no archive
+ }
+ return tableArchivePath;
+ }
+
+ /**
+ * Given the backup root dir and the backup id, return the log file location for an incremental
+ * backup.
+ * @param backupRootDir backup root directory
+ * @param backupId backup id
+ * @return logBackupDir: ".../user/biadmin/backup1/WALs/backup_1396650096738"
+ */
+ protected static String getLogBackupDir(String backupRootDir, String backupId) {
+ return backupRootDir + File.separator + HConstants.HREGION_LOGDIR_NAME + File.separator
+ + backupId;
+ }
+
+ protected static Path getLogBackupPath(String backupRootDir, String backupId) {
+ return new Path(getLogBackupDir(backupRootDir, backupId));
+ }
+
+ private Path getManifestPath(String tableName) throws IOException {
+ Path manifestPath = new Path(getTableBackupPath(tableName), BackupManifest.FILE_NAME);
+
+ LOG.debug("Looking for " + manifestPath.toString());
+ if (!fs.exists(manifestPath)) {
+ // check log dir for incremental backup case
+ manifestPath =
+ new Path(getLogBackupDir(this.backupRootPath.toString(), this.backupId) + File.separator
+ + BackupManifest.FILE_NAME);
+ LOG.debug("Looking for " + manifestPath.toString());
+ if (!fs.exists(manifestPath)) {
+ String errorMsg =
+ "Could not find backup manifest for " + backupId + " in " + backupRootPath.toString();
+ throw new IOException(errorMsg);
+ }
+ }
+ return manifestPath;
+ }
+
+ protected BackupManifest getManifest(String tableName) throws IOException {
+ BackupManifest manifest = new BackupManifest(conf, this.getManifestPath(tableName));
+ return manifest;
+ }
+
+ /**
+ * Gets region list
+ * @param tableName table name
+ * @return RegionList region list
+ * @throws FileNotFoundException exception
+ * @throws IOException exception
+ */
+
+ protected ArrayList<Path> getRegionList(String tableName) throws FileNotFoundException,
+ IOException {
+ Path tableArchivePath = this.getTableArchivePath(tableName);
+ ArrayList<Path> regionDirList = new ArrayList<Path>();
+ FileStatus[] children = fs.listStatus(tableArchivePath);
+ for (FileStatus childStatus : children) {
+ // here child refer to each region(Name)
+ Path child = childStatus.getPath();
+ regionDirList.add(child);
+ }
+ return regionDirList;
+ }
+
+ /**
+ * Gets region list
+ * @param tableArchivePath table archive path
+ * @return RegionList region list
+ * @throws FileNotFoundException exception
+ * @throws IOException exception
+ */
+ protected ArrayList<Path> getRegionList(Path tableArchivePath) throws FileNotFoundException,
+ IOException {
+ ArrayList<Path> regionDirList = new ArrayList<Path>();
+ FileStatus[] children = fs.listStatus(tableArchivePath);
+ for (FileStatus childStatus : children) {
+ // here child refer to each region(Name)
+ Path child = childStatus.getPath();
+ regionDirList.add(child);
+ }
+ return regionDirList;
+ }
+
+ /**
+ * Counts the number of files in all subdirectories of an HBase tables, i.e. HFiles. And finds the
+ * maximum number of files in one HBase table.
+ * @param tableArchivePath archive path
+ * @return the maximum number of files found in 1 HBase table
+ * @throws IOException exception
+ */
+ protected int getMaxNumberOfFilesInSubDir(Path tableArchivePath) throws IOException {
+ int result = 1;
+ ArrayList<Path> regionPathList = this.getRegionList(tableArchivePath);
+ // tableArchivePath = this.getTableArchivePath(tableName);
+
+ if (regionPathList == null || regionPathList.size() == 0) {
+ throw new IllegalStateException("Cannot restore hbase table because directory '"
+ + tableArchivePath + "' is not a directory.");
+ }
+
+ for (Path regionPath : regionPathList) {
+ result = Math.max(result, getNumberOfFilesInDir(regionPath));
+ }
+ return result;
+ }
+
+ /**
+ * Counts the number of files in all subdirectories of an HBase table, i.e. HFiles.
+ * @param regionPath Path to an HBase table directory
+ * @return the number of files all directories
+ * @throws IOException exception
+ */
+ protected int getNumberOfFilesInDir(Path regionPath) throws IOException {
+ int result = 0;
+
+ if (!fs.exists(regionPath) || !fs.getFileStatus(regionPath).isDirectory()) {
+ throw new IllegalStateException("Cannot restore hbase table because directory '"
+ + regionPath.toString() + "' is not a directory.");
+ }
+
+ FileStatus[] tableDirContent = fs.listStatus(regionPath);
+ for (FileStatus subDirStatus : tableDirContent) {
+ FileStatus[] colFamilies = fs.listStatus(subDirStatus.getPath());
+ for (FileStatus colFamilyStatus : colFamilies) {
+ FileStatus[] colFamilyContent = fs.listStatus(colFamilyStatus.getPath());
+ result += colFamilyContent.length;
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Duplicate the backup image if it's on local cluster
+ * @see HStore#bulkLoadHFile(String, long)
+ * @see HRegionFileSystem#bulkLoadStoreFile(String familyName, Path srcPath, long seqNum)
+ * @param tableArchivePath archive path
+ * @return the new tableArchivePath
+ * @throws IOException exception
+ */
+ protected Path checkLocalAndBackup(Path tableArchivePath) throws IOException {
+ // Move the file if it's on local cluster
+ boolean isCopyNeeded = false;
+
+ FileSystem srcFs = tableArchivePath.getFileSystem(conf);
+ FileSystem desFs = FileSystem.get(conf);
+ if (tableArchivePath.getName().startsWith("/")) {
+ isCopyNeeded = true;
+ } else {
+ // This should match what is done in @see HRegionFileSystem#bulkLoadStoreFile(String, Path,
+ // long)
+ if (srcFs.getUri().equals(desFs.getUri())) {
+ LOG.debug("cluster hold the backup image: " + srcFs.getUri() + "; local cluster node: "
+ + desFs.getUri());
+ isCopyNeeded = true;
+ }
+ }
+ if (isCopyNeeded) {
+ LOG.debug("File " + tableArchivePath + " on local cluster, back it up before restore");
+ Path tmpPath = new Path(RESTORE_TMP_PATH);
+ if (desFs.exists(tmpPath)) {
+ try {
+ desFs.delete(tmpPath, true);
+ } catch (IOException e) {
+ LOG.debug("Failed to delete path: " + tmpPath
+ + ", need to check whether restore target DFS cluster is healthy");
+ }
+ }
+ FileUtil.copy(srcFs, tableArchivePath, desFs, tmpPath, false, conf);
+ LOG.debug("Copied to temporary path on local cluster: " + tmpPath);
+ tableArchivePath = tmpPath;
+ }
+ return tableArchivePath;
+ }
+
+ /**
+ * Calculate region boundaries and add all the column families to the table descriptor
+ * @param regionDirList region dir list
+ * @return a set of keys to store the boundaries
+ */
+ protected byte[][] generateBoundaryKeys(ArrayList<Path> regionDirList)
+ throws FileNotFoundException, IOException {
+ TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+ // Build a set of keys to store the boundaries
+ byte[][] keys = null;
+ // calculate region boundaries and add all the column families to the table descriptor
+ for (Path regionDir : regionDirList) {
+ LOG.debug("Parsing region dir: " + regionDir);
+ Path hfofDir = regionDir;
+
+ if (!fs.exists(hfofDir)) {
+ LOG.warn("HFileOutputFormat dir " + hfofDir + " not found");
+ }
+
+ FileStatus[] familyDirStatuses = fs.listStatus(hfofDir);
+ if (familyDirStatuses == null) {
+ throw new IOException("No families found in " + hfofDir);
+ }
+
+ for (FileStatus stat : familyDirStatuses) {
+ if (!stat.isDirectory()) {
+ LOG.warn("Skipping non-directory " + stat.getPath());
+ continue;
+ }
+ boolean isIgnore = false;
+ String pathName = stat.getPath().getName();
+ for (String ignore : ignoreDirs) {
+ if (pathName.contains(ignore)) {
+ LOG.warn("Skipping non-family directory" + pathName);
+ isIgnore = true;
+ break;
+ }
+ }
+ if (isIgnore) {
+ continue;
+ }
+ Path familyDir = stat.getPath();
+ LOG.debug("Parsing family dir [" + familyDir.toString() + " in region [" + regionDir + "]");
+ // Skip _logs, etc
+ if (familyDir.getName().startsWith("_") || familyDir.getName().startsWith(".")) {
+ continue;
+ }
+
+ // start to parse hfile inside one family dir
+ Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir));
+ for (Path hfile : hfiles) {
+ if (hfile.getName().startsWith("_") || hfile.getName().startsWith(".")
+ || StoreFileInfo.isReference(hfile.getName())
+ || HFileLink.isHFileLink(hfile.getName())) {
+ continue;
+ }
+ HFile.Reader reader = HFile.createReader(fs, hfile, new CacheConfig(conf), conf);
+ final byte[] first, last;
+ try {
+ reader.loadFileInfo();
+ first = reader.getFirstRowKey();
+ last = reader.getLastRowKey();
+ LOG.debug("Trying to figure out region boundaries hfile=" + hfile + " first="
+ + Bytes.toStringBinary(first) + " last=" + Bytes.toStringBinary(last));
+
+ // To eventually infer start key-end key boundaries
+ Integer value = map.containsKey(first) ? (Integer) map.get(first) : 0;
+ map.put(first, value + 1);
+ value = map.containsKey(last) ? (Integer) map.get(last) : 0;
+ map.put(last, value - 1);
+ } finally {
+ reader.close();
+ }
+ }
+ }
+ }
+ keys = LoadIncrementalHFiles.inferBoundaries(map);
+ return keys;
+ }
+
+ /**
+ * Check whether the backup path exist
+ * @param backupStr backup
+ * @param conf configuration
+ * @return Yes if path exists
+ * @throws IOException exception
+ */
+ protected static boolean checkPathExist(String backupStr, Configuration conf)
+ throws IOException {
+ boolean isExist = false;
+ Path backupPath = new Path(backupStr);
+ FileSystem fileSys = backupPath.getFileSystem(conf);
+ String targetFsScheme = fileSys.getUri().getScheme();
+ LOG.debug("Schema of given url: " + backupStr + " is: " + targetFsScheme);
+ if (fileSys.exists(backupPath)) {
+ isExist = true;
+ }
+ return isExist;
+ }
+
+ /**
+ * Check whether the backup image path and there is manifest file in the path.
+ * @param backupManifestMap If all the manifests are found, then they are put into this map
+ * @param tableArray the tables involved
+ * @throws IOException exception
+ */
+ protected void checkImageManifestExist(HashMap<String, BackupManifest> backupManifestMap,
+ String[] tableArray) throws IOException {
+
+ try {
+ for (String tableName : tableArray) {
+ BackupManifest manifest = this.getManifest(tableName);
+ backupManifestMap.put(tableName, manifest);
+ }
+ } catch (IOException e) {
+ String expMsg = e.getMessage();
+ if (expMsg.contains("No FileSystem for scheme")) {
+ if (expMsg.contains("gpfs")) {
+ LOG.error("Please change to use webhdfs url when "
+ + "the backup image to restore locates on gpfs cluster");
+ } else {
+ LOG.error("Unsupported filesystem scheme found in the backup target url, "
+ + "please check the url to make sure no typo in it");
+ }
+ throw e;
+ } else if (expMsg.contains("no authority supported")) {
+ LOG.error("Please change to use webhdfs url when "
+ + "the backup image to restore locates on gpfs cluster");
+ throw e;
+ } else {
+ LOG.error(expMsg);
+ throw e;
+ }
+ }
+ }
+
+ public static String join(String[] names) {
+ StringBuilder sb = new StringBuilder();
+ String sep = BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND;
+ for (String s : names) {
+ sb.append(sep).append(s);
+ }
+ return sb.toString();
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/de69f0df/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/IncrementalBackupManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/IncrementalBackupManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/IncrementalBackupManager.java
new file mode 100644
index 0000000..e91857f
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/IncrementalBackupManager.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+
+/**
+ * After a full backup was created, the incremental backup will only store the changes made
+ * after the last full or incremental backup.
+ *
+ * Creating the backup copies the logfiles in .logs and .oldlogs since the last backup timestamp.
+ *
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class IncrementalBackupManager {
+ // parent manager
+ private BackupManager backupManager;
+
+ public static final Log LOG = LogFactory.getLog(IncrementalBackupManager.class);
+
+ public IncrementalBackupManager(BackupManager bm) {
+ this.backupManager = bm;
+ }
+
+ /**
+ * Obtain the list of logs that need to be copied out for this incremental backup. The list is set
+ * in BackupContext.
+ * @param backupContext backup context
+ * @return The new HashMap of RS log timestamps after the log roll for this incremental backup.
+ * @throws IOException exception
+ */
+ public HashMap<String, String> getIncrBackupLogFileList(BackupContext backupContext)
+ throws IOException {
+ List<String> logList;
+ HashMap<String, String> newTimestamps;
+ HashMap<String, String> previousTimestampMins;
+
+ Configuration conf = BackupUtil.getConf();
+ String savedStartCode = backupManager.readBackupStartCode();
+
+ // key: tableName
+ // value: <RegionServer,PreviousTimeStamp>
+ HashMap<String, HashMap<String, String>> previousTimestampMap =
+ backupManager.readLogTimestampMap();
+
+ previousTimestampMins = BackupUtil.getRSLogTimestampMins(previousTimestampMap);
+
+ LOG.debug("StartCode " + savedStartCode + "for backupID " + backupContext.getBackupId());
+ LOG.debug("Timestamps " + previousTimestampMap);
+ // get all new log files from .logs and .oldlogs after last TS and before new timestamp
+ if (savedStartCode == null ||
+ previousTimestampMins == null ||
+ previousTimestampMins.isEmpty()) {
+ throw new IOException("Cannot read any previous back up timestamps from hbase:backup. "
+ + "In order to create an incremental backup, at least one full backup is needed.");
+ }
+
+ HBaseAdmin hbadmin = null;
+ Connection conn = null;
+ try {
+ LOG.info("Execute roll log procedure for incremental backup ...");
+ conn = ConnectionFactory.createConnection(conf);
+ hbadmin = (HBaseAdmin) conn.getAdmin();
+ hbadmin.execProcedure(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE,
+ LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, new HashMap<String, String>());
+ } finally {
+ if (hbadmin != null) {
+ hbadmin.close();
+ }
+ if(conn != null){
+ conn.close();
+ }
+ }
+
+ newTimestamps = backupManager.readRegionServerLastLogRollResult();
+
+ logList = getLogFilesForNewBackup(previousTimestampMins, newTimestamps, conf, savedStartCode);
+
+ backupContext.setIncrBackupFileList(logList);
+
+ return newTimestamps;
+ }
+
+ /**
+ * For each region server: get all log files newer than the last timestamps but not newer than the
+ * newest timestamps.
+ * @param olderTimestamps the timestamp for each region server of the last backup.
+ * @param newestTimestamps the timestamp for each region server that the backup should lead to.
+ * @param conf the Hadoop and Hbase configuration
+ * @param savedStartCode the startcode (timestamp) of last successful backup.
+ * @return a list of log files to be backed up
+ * @throws IOException exception
+ */
+ private List<String> getLogFilesForNewBackup(HashMap<String, String> olderTimestamps,
+ HashMap<String, String> newestTimestamps, Configuration conf, String savedStartCode)
+ throws IOException {
+ LOG.debug("In getLogFilesForNewBackup()\n" + "olderTimestamps: " + olderTimestamps
+ + "\n newestTimestamps: " + newestTimestamps);
+ Path rootdir = FSUtils.getRootDir(conf);
+ Path logDir = new Path(rootdir, HConstants.HREGION_LOGDIR_NAME);
+ Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
+ FileSystem fs = rootdir.getFileSystem(conf);
+ NewestLogFilter pathFilter = new NewestLogFilter(conf);
+
+ List<String> resultLogFiles = new ArrayList<String>();
+ List<String> newestLogs = new ArrayList<String>();
+
+ /*
+ * The old region servers and timestamps info we kept in hbase:backup may be out of sync if new
+ * region server is added or existing one lost. We'll deal with it here when processing the
+ * logs. If data in hbase:backup has more hosts, just ignore it. If the .logs directory includes
+ * more hosts, the additional hosts will not have old timestamps to compare with. We'll just use
+ * all the logs in that directory. We always write up-to-date region server and timestamp info
+ * to hbase:backup at the end of successful backup.
+ */
+
+ FileStatus[] rss;
+ Path p;
+ String host;
+ String oldTimeStamp;
+ String currentLogFile;
+ String currentLogTS;
+
+ // Get the files in .logs.
+ rss = fs.listStatus(logDir);
+ for (FileStatus rs : rss) {
+ p = rs.getPath();
+ host = DefaultWALProvider.getServerNameFromWALDirectoryName(p).getHostname();
+ FileStatus[] logs;
+ oldTimeStamp = olderTimestamps.get(host);
+ // It is possible that there is no old timestamp in hbase:backup for this host if
+ // this region server is newly added after our last backup.
+ if (oldTimeStamp == null) {
+ logs = fs.listStatus(p);
+ } else {
+ pathFilter.setLastBackupTS(oldTimeStamp);
+ logs = fs.listStatus(p, pathFilter);
+ }
+ for (FileStatus log : logs) {
+ LOG.debug("currentLogFile: " + log.getPath().toString());
+ if (DefaultWALProvider.isMetaFile(log.getPath())) {
+ LOG.debug("Skip hbase:meta log file: " + log.getPath().getName());
+ continue;
+ }
+ currentLogFile = log.getPath().toString();
+ resultLogFiles.add(currentLogFile);
+ currentLogTS = BackupUtil.getCreationTime(log.getPath(), conf);
+ // newestTimestamps is up-to-date with the current list of hosts
+ // so newestTimestamps.get(host) will not be null.
+ if (Long.valueOf(currentLogTS) > Long.valueOf(newestTimestamps.get(host))) {
+ newestLogs.add(currentLogFile);
+ }
+ }
+ }
+
+ // Include the .oldlogs files too.
+ FileStatus[] oldlogs = fs.listStatus(oldLogDir);
+ for (FileStatus oldlog : oldlogs) {
+ p = oldlog.getPath();
+ currentLogFile = p.toString();
+ if (DefaultWALProvider.isMetaFile(p)) {
+ LOG.debug("Skip .meta log file: " + currentLogFile);
+ continue;
+ }
+ host = BackupUtil.parseHostFromOldLog(p);
+ currentLogTS = BackupUtil.getCreationTime(p, conf);
+ oldTimeStamp = olderTimestamps.get(host);
+ /*
+ * It is possible that there is no old timestamp in hbase:backup for this host. At the time of
+ * our last backup operation, this rs did not exist. The reason can be one of the two: 1. The
+ * rs already left/crashed. Its logs were moved to .oldlogs. 2. The rs was added after our
+ * last backup.
+ */
+ if (oldTimeStamp == null) {
+ if (Long.valueOf(currentLogTS) < Long.valueOf(savedStartCode)) {
+ // This log file is really old, its region server was before our last backup.
+ continue;
+ } else {
+ resultLogFiles.add(currentLogFile);
+ }
+ } else if (Long.valueOf(currentLogTS) > Long.valueOf(oldTimeStamp)) {
+ resultLogFiles.add(currentLogFile);
+ }
+
+ LOG.debug("resultLogFiles before removal of newestLogs: " + resultLogFiles);
+ // It is possible that a host in .oldlogs is an obsolete region server
+ // so newestTimestamps.get(host) here can be null.
+ // Even if these logs belong to a obsolete region server, we still need
+ // to include they to avoid loss of edits for backup.
+ String newTimestamp = newestTimestamps.get(host);
+ if (newTimestamp != null && Long.valueOf(currentLogTS) > Long.valueOf(newTimestamp)) {
+ newestLogs.add(currentLogFile);
+ }
+ }
+ LOG.debug("newestLogs: " + newestLogs);
+ // remove newest log per host because they are still in use
+ resultLogFiles.removeAll(newestLogs);
+ LOG.debug("resultLogFiles after removal of newestLogs: " + resultLogFiles);
+ return resultLogFiles;
+ }
+
+ class NewestLogFilter implements PathFilter {
+ private String lastBackupTS = "0";
+ final private Configuration conf;
+
+ public NewestLogFilter(Configuration conf) {
+ this.conf = conf;
+ }
+
+ protected void setLastBackupTS(String ts) {
+ this.lastBackupTS = ts;
+ }
+
+ @Override
+ public boolean accept(Path path) {
+ // skip meta table log -- ts.meta file
+ if (DefaultWALProvider.isMetaFile(path)) {
+ LOG.debug("Skip .meta log file: " + path.getName());
+ return false;
+ }
+ String timestamp;
+ try {
+ timestamp = BackupUtil.getCreationTime(path, conf);
+ return Long.valueOf(timestamp) > Long.valueOf(lastBackupTS);
+ } catch (IOException e) {
+ LOG.warn("Cannot read timestamp of log file " + path);
+ return false;
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/de69f0df/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/IncrementalRestoreService.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/IncrementalRestoreService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/IncrementalRestoreService.java
new file mode 100644
index 0000000..72e4879
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/IncrementalRestoreService.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface IncrementalRestoreService extends Configurable{
+
+ public void run(String logDirectory, String[] fromTables, String[] toTables)
+ throws IOException;
+}