You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2015/12/27 19:02:20 UTC

[4/6] hbase git commit: HBASE-14030 HBase Backup/Restore Phase 1 (Vladimir Rodionov)

http://git-wip-us.apache.org/repos/asf/hbase/blob/de69f0df/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupSystemTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupSystemTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupSystemTable.java
new file mode 100644
index 0000000..14769f9
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupSystemTable.java
@@ -0,0 +1,642 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupHandler.BACKUPSTATUS;
+import org.apache.hadoop.hbase.backup.BackupUtil.BackupCompleteData;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+
+/**
+ * This class provides 'hbase:backup' table API
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public final class BackupSystemTable {
+
+  private static final Log LOG = LogFactory.getLog(BackupSystemTable.class);
+  private final static String TABLE_NAMESPACE = "hbase";
+  private final static String TABLE_NAME = "backup";
+  private final static TableName tableName = TableName.valueOf(TABLE_NAMESPACE, TABLE_NAME);
+  public final static byte[] familyName = "f".getBytes();
+
+  // Connection to HBase cluster
+  private static Connection connection;
+  // Cluster configuration
+  private static Configuration config;
+  // singleton
+  private static BackupSystemTable table;
+
+  /**
+   * Get instance by a given configuration
+   * @param conf - HBase configuration
+   * @return instance of BackupSystemTable
+   * @throws IOException exception
+   */
+  public synchronized static BackupSystemTable getTable(Configuration conf) throws IOException {
+    if (connection == null) {
+      connection = ConnectionFactory.createConnection(conf);
+      config = conf;
+      // Verify hbase:system exists
+      createSystemTableIfNotExists();
+      table = new BackupSystemTable();
+    }
+    return table;
+  }
+
+  /**
+   * TODO: refactor
+   * @throws IOException exception
+   */
+  public static void close() throws IOException {
+    connection.close();
+    table = null;
+  }
+
+  /**
+   * Gets table name
+   * @return table name
+   */
+  public static TableName getTableName() {
+    return tableName;
+  }
+
+  private static void createSystemTableIfNotExists() throws IOException {
+    Admin admin = null;
+    try {
+      admin = connection.getAdmin();
+      if (admin.tableExists(tableName) == false) {
+        HTableDescriptor tableDesc = new HTableDescriptor(tableName);
+        HColumnDescriptor colDesc = new HColumnDescriptor(familyName);
+        colDesc.setMaxVersions(1);
+        int ttl =
+            config.getInt(HConstants.BACKUP_SYSTEM_TTL_KEY, HConstants.BACKUP_SYSTEM_TTL_DEFAULT);
+        colDesc.setTimeToLive(ttl);
+        tableDesc.addFamily(colDesc);
+        admin.createTable(tableDesc);
+      }
+    } catch (IOException e) {
+      LOG.error(e);
+      throw e;
+    } finally {
+      if (admin != null) {
+        admin.close();
+      }
+    }
+  }
+
+  private BackupSystemTable() {
+  }
+
+  /**
+   * Updates status (state) of a backup session in hbase:backup table
+   * @param context context
+   * @throws IOException exception
+   */
+  public void updateBackupStatus(BackupContext context) throws IOException {
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("update backup status in hbase:backup for: " + context.getBackupId()
+        + " set status=" + context.getFlag());
+    }
+    Table table = null;
+    try {
+      table = connection.getTable(tableName);
+      Put put = BackupSystemTableHelper.createPutForBackupContext(context);
+      table.put(put);
+    } finally {
+      if (table != null) {
+        table.close();
+      }
+    }
+  }
+
+  /**
+   * Deletes backup status from hbase:backup table
+   * @param backupId backup id
+   * @throws IOException exception
+   */
+
+  public void deleteBackupStatus(String backupId) throws IOException {
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("delete backup status in hbase:backup for " + backupId);
+    }
+    Table table = null;
+    try {
+      table = connection.getTable(tableName);
+      Delete del = BackupSystemTableHelper.createDeletForBackupContext(backupId);
+      table.delete(del);
+    } finally {
+      if (table != null) {
+        table.close();
+      }
+    }
+  }
+
+  /**
+   * Reads backup status object (instance of BackupContext) from hbase:backup table
+   * @param backupId - backupId
+   * @return Current status of backup session or null
+   */
+
+  public BackupContext readBackupStatus(String backupId) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("read backup status from hbase:backup for: " + backupId);
+    }
+
+    Table table = null;
+    try {
+      table = connection.getTable(tableName);
+      Get get = BackupSystemTableHelper.createGetForBackupContext(backupId);
+      Result res = table.get(get);
+      if(res.isEmpty()){
+        return null;
+      }
+      return BackupSystemTableHelper.resultToBackupContext(res);
+    } finally {
+      if (table != null) {
+        table.close();
+      }
+    }
+  }
+
+  /**
+   * Read the last backup start code (timestamp) of last successful backup. Will return null if
+   * there is no start code stored on hbase or the value is of length 0. These two cases indicate
+   * there is no successful backup completed so far.
+   * @return the timestamp of last successful backup
+   * @throws IOException exception
+   */
+  public String readBackupStartCode() throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("read backup start code from hbase:backup");
+    }
+    Table table = null;
+    try {
+      table = connection.getTable(tableName);
+      Get get = BackupSystemTableHelper.createGetForStartCode();
+      Result res = table.get(get);
+      if (res.isEmpty()){
+        return null;
+      }
+      Cell cell = res.listCells().get(0);
+      byte[] val = CellUtil.cloneValue(cell);
+      if (val.length == 0){
+        return null;
+      }
+      return new String(val);
+    } finally {
+      if (table != null) {
+        table.close();
+      }
+    }
+  }
+
+  /**
+   * Write the start code (timestamp) to hbase:backup. If passed in null, then write 0 byte.
+   * @param startCode start code
+   * @throws IOException exception
+   */
+  public void writeBackupStartCode(String startCode) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("write backup start code to hbase:backup " + startCode);
+    }
+    Table table = null;
+    try {
+      table = connection.getTable(tableName);
+      Put put = BackupSystemTableHelper.createPutForStartCode(startCode);
+      table.put(put);
+    } finally {
+      if (table != null) {
+        table.close();
+      }
+    }
+  }
+
+  /**
+   * Get the Region Servers log information after the last log roll from hbase:backup.
+   * @return RS log info
+   * @throws IOException exception
+   */
+  public HashMap<String, String> readRegionServerLastLogRollResult() 
+      throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("read region server last roll log result to hbase:backup");
+    }
+    Table table = null;
+    ResultScanner scanner = null;
+
+    try {
+      table = connection.getTable(tableName);
+      Scan scan = BackupSystemTableHelper.createScanForReadRegionServerLastLogRollResult();
+      scan.setMaxVersions(1);
+      scanner = table.getScanner(scan);
+      Result res = null;
+      HashMap<String, String> rsTimestampMap = new HashMap<String, String>();
+      while ((res = scanner.next()) != null) {
+        res.advance();
+        Cell cell = res.current();
+        byte[] row = CellUtil.cloneRow(cell);
+        String server =
+            BackupSystemTableHelper.getServerNameForReadRegionServerLastLogRollResult(row);
+
+        byte[] data = CellUtil.cloneValue(cell);
+        rsTimestampMap.put(server, new String(data));
+      }
+      return rsTimestampMap;
+    } finally {
+      if (scanner != null) {
+        scanner.close();
+      }
+      if (table != null) {
+        table.close();
+      }
+    }
+  }
+
+  /**
+   * Writes Region Server last roll log result (timestamp) to hbase:backup table
+   * @param server - Region Server name
+   * @param fileName - last log timestamp
+   * @throws IOException exception
+   */
+  public void writeRegionServerLastLogRollResult(String server, String fileName) 
+      throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("write region server last roll log result to hbase:backup");
+    }
+    Table table = null;
+    try {
+      table = connection.getTable(tableName);
+      Put put = 
+          BackupSystemTableHelper.createPutForRegionServerLastLogRollResult(server, fileName);
+      table.put(put);
+    } finally {
+      if (table != null) {
+        table.close();
+      }
+    }
+  }
+
+  /**
+   * Get all completed backup information (in desc order by time)
+   * @return history info of BackupCompleteData
+   * @throws IOException exception
+   */
+  public ArrayList<BackupCompleteData> getBackupHistory() throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("get backup history from hbase:backup");
+    }
+    Table table = null;
+    ResultScanner scanner = null;
+    ArrayList<BackupCompleteData> list = new ArrayList<BackupCompleteData>();
+    try {
+      table = connection.getTable(tableName);
+      Scan scan = BackupSystemTableHelper.createScanForBackupHistory();
+      scan.setMaxVersions(1);
+      scanner = table.getScanner(scan);
+      Result res = null;
+      while ((res = scanner.next()) != null) {
+        res.advance();
+        BackupContext context = BackupSystemTableHelper.cellToBackupContext(res.current());
+        if (context.getFlag() != BACKUPSTATUS.COMPLETE) {
+          continue;
+        }
+
+        BackupCompleteData history = new BackupCompleteData();
+        history.setBackupToken(context.getBackupId());
+        history.setStartTime(Long.toString(context.getStartTs()));
+        history.setEndTime(Long.toString(context.getEndTs()));
+        history.setBackupRootPath(context.getTargetRootDir());
+        history.setTableList(context.getTableListAsString());
+        history.setType(context.getType());
+        history.setBytesCopied(Long.toString(context.getTotalBytesCopied()));
+
+        if (context.fromExistingSnapshot()) {
+          history.markFromExistingSnapshot();
+        }
+        list.add(history);
+      }
+      return BackupUtil.sortHistoryListDesc(list);
+    } finally {
+      if (scanner != null) {
+        scanner.close();
+      }
+      if (table != null) {
+        table.close();
+      }
+    }
+  }
+
+  /**
+   * Get all backup session with a given status (in desc order by time)
+   * @param status status
+   * @return history info of backup contexts
+   * @throws IOException exception
+   */
+  public ArrayList<BackupContext> getBackupContexts(BACKUPSTATUS status) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("get backup contexts from hbase:backup");
+    }
+    Table table = null;
+    ResultScanner scanner = null;
+    ArrayList<BackupContext> list = new ArrayList<BackupContext>();
+    try {
+      table = connection.getTable(tableName);
+      Scan scan = BackupSystemTableHelper.createScanForBackupHistory();
+      scan.setMaxVersions(1);
+      scanner = table.getScanner(scan);
+      Result res = null;
+      while ((res = scanner.next()) != null) {
+        res.advance();
+        BackupContext context = BackupSystemTableHelper.cellToBackupContext(res.current());
+        if (context.getFlag() != status){
+          continue;
+        }
+        list.add(context);
+      }
+      return list;
+    } finally {
+      if (scanner != null) {
+        scanner.close();
+      }
+      if (table != null) {
+        table.close();
+      }
+    }
+  }
+
+  /**
+   * Write the current timestamps for each regionserver to hbase:backup after a successful full or
+   * incremental backup. The saved timestamp is of the last log file that was backed up already.
+   * @param tables tables
+   * @param newTimestamps timestamps
+   * @throws IOException exception
+   */
+  public void writeRegionServerLogTimestamp(Set<String> tables,
+      HashMap<String, String> newTimestamps) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("write RS log ts to HBASE_BACKUP");
+    }
+    StringBuilder sb = new StringBuilder();
+    for (Map.Entry<String, String> entry : newTimestamps.entrySet()) {
+      String host = entry.getKey();
+      String timestamp = entry.getValue();
+      sb.append(host).append(BackupUtil.FIELD_SEPARATOR).append(timestamp)
+      .append(BackupUtil.RECORD_SEPARATOR);
+    }
+    String smap = sb.toString();
+    List<Put> puts = new ArrayList<Put>();
+    for (String table : tables) {
+      Put put = BackupSystemTableHelper.createPutForWriteRegionServerLogTimestamp(table, smap);
+      puts.add(put);
+    }
+    Table table = null;
+    try {
+      table = connection.getTable(tableName);
+      table.put(puts);
+    } finally {
+      if (table != null) {
+        table.close();
+      }
+    }
+  }
+
+  /**
+   * Read the timestamp for each region server log after the last successful backup. Each table has
+   * its own set of the timestamps. The info is stored for each table as a concatenated string of
+   * rs->timestapmp
+   * @return the timestamp for each region server. key: tableName value:
+   *         RegionServer,PreviousTimeStamp
+   * @throws IOException exception
+   */
+  public HashMap<String, HashMap<String, String>> readLogTimestampMap() throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("read RS log ts from HBASE_BACKUP");
+    }
+
+    Table table = null;
+    ResultScanner scanner = null;
+    HashMap<String, HashMap<String, String>> tableTimestampMap =
+        new HashMap<String, HashMap<String, String>>();
+
+    try {
+      table = connection.getTable(tableName);
+      Scan scan = BackupSystemTableHelper.createScanForReadLogTimestampMap();
+      scanner = table.getScanner(scan);
+      Result res = null;
+      while ((res = scanner.next()) != null) {
+        res.advance();
+        Cell cell = res.current();
+        byte[] row = CellUtil.cloneRow(cell);
+        String tabName = BackupSystemTableHelper.getTableNameForReadLogTimestampMap(row);
+        HashMap<String, String> lastBackup = new HashMap<String, String>();
+        byte[] data = CellUtil.cloneValue(cell);
+        if (data == null) {
+          // TODO
+          throw new IOException("Data of last backup data from HBASE_BACKUP "
+              + "is empty. Create a backup first.");
+        }
+        if (data != null && data.length > 0) {
+          String s = new String(data);
+          String[] records = s.split(BackupUtil.RECORD_SEPARATOR);
+          for (String record : records) {
+            String[] flds = record.split(BackupUtil.FIELD_SEPARATOR);
+            if (flds.length != 2) {
+              throw new IOException("data from HBASE_BACKUP is corrupted: "
+                  + Arrays.toString(flds));
+            }
+            lastBackup.put(flds[0], flds[1]);
+          }
+          tableTimestampMap.put(tabName, lastBackup);
+        }
+      }
+      return tableTimestampMap;
+    } finally {
+      if (scanner != null) {
+        scanner.close();
+      }
+      if (table != null) {
+        table.close();
+      }
+    }
+  }
+
+  /**
+   * Return the current tables covered by incremental backup.
+   * @return set of tableNames
+   * @throws IOException exception
+   */
+  public Set<String> getIncrementalBackupTableSet() throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("get incr backup table set from hbase:backup");
+    }
+    Table table = null;
+    TreeSet<String> set = new TreeSet<String>();
+
+    try {
+      table = connection.getTable(tableName);
+      Get get = BackupSystemTableHelper.createGetForIncrBackupTableSet();
+      Result res = table.get(get);
+      if (res.isEmpty()) {
+        return set;
+      }
+      List<Cell> cells = res.listCells();
+      for (Cell cell : cells) {
+        // qualifier = table name - we use table names as qualifiers
+        // TODO ns:table as qualifier?
+        set.add(new String(CellUtil.cloneQualifier(cell)));
+      }
+      return set;
+    } finally {
+      if (table != null) {
+        table.close();
+      }
+    }
+  }
+
+  /**
+   * Add tables to global incremental backup set
+   * @param tables - set of tables
+   * @throws IOException exception
+   */
+  public void addIncrementalBackupTableSet(Set<String> tables) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("add incr backup table set to hbase:backup");
+    }
+    Table table = null;
+    try {
+      table = connection.getTable(tableName);
+      Put put = BackupSystemTableHelper.createPutForIncrBackupTableSet(tables);
+      table.put(put);
+    } finally {
+      if (table != null) {
+        table.close();
+      }
+    }
+  }
+
+  /**
+   * Register WAL files as eligible for deletion
+   * @param files files
+   * @throws IOException exception
+   */
+  public void addWALFiles(List<String> files, String backupId) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("add WAL files to hbase:backup");
+    }
+    Table table = null;
+    try {
+      table = connection.getTable(tableName);
+      List<Put> puts = BackupSystemTableHelper.createPutsForAddWALFiles(files, backupId);
+      table.put(puts);
+    } finally {
+      if (table != null) {
+        table.close();
+      }
+    }
+  }
+
+  /**
+   * Check if WAL file is eligible for deletion
+   * @param file file
+   * @return true, if - yes.
+   * @throws IOException exception
+   */
+  public boolean checkWALFile(String file) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Check if WAL file has been already backuped in hbase:backup");
+    }
+    Table table = null;
+    try {
+      table = connection.getTable(tableName);
+      Get get = BackupSystemTableHelper.createGetForCheckWALFile(file);
+      Result res = table.get(get);
+      if (res.isEmpty()){
+        return false;
+      }
+      return true;
+    } finally {
+      if (table != null) {
+        table.close();
+      }
+    }
+  }
+
+  /**
+   * Checks if we have at least one backup session in hbase:backup This API is used by
+   * BackupLogCleaner
+   * @return true, if - at least one session exists in hbase:backup table
+   * @throws IOException exception
+   */
+  public boolean hasBackupSessions() throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("has backup sessions from hbase:backup");
+    }
+    Table table = null;
+    ResultScanner scanner = null;
+    boolean result = false;
+    try {
+      table = connection.getTable(tableName);
+      Scan scan = BackupSystemTableHelper.createScanForBackupHistory();
+      scan.setMaxVersions(1);
+      scan.setCaching(1);
+      scanner = table.getScanner(scan);
+      if (scanner.next() != null) {
+        result = true;
+      }
+      return result;
+    } finally {
+      if (scanner != null) {
+        scanner.close();
+      }
+      if (table != null) {
+        table.close();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/de69f0df/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupSystemTableHelper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupSystemTableHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupSystemTableHelper.java
new file mode 100644
index 0000000..bf62a84
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupSystemTableHelper.java
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+
+
+/**
+ * A collection for methods used by BackupSystemTable.
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public final class BackupSystemTableHelper {
+
+  /**
+   * hbase:backup schema: 
+   * 1. Backup sessions rowkey= "session." + backupId; value = serialized
+   * BackupContext 
+   * 2. Backup start code rowkey = "startcode"; value = startcode 
+   * 3. Incremental backup set rowkey="incrbackupset"; value=[list of tables] 
+   * 4. Table-RS-timestamp map rowkey="trslm"+ table_name; value = map[RS-> last WAL timestamp] 
+   * 5. RS - WAL ts map rowkey="rslogts."+server; value = last WAL timestamp 
+   * 6. WALs recorded rowkey="wals."+WAL unique file name; value = NULL (value is not used)
+   */
+  private static final Log LOG = LogFactory.getLog(BackupSystemTableHelper.class);
+
+  private final static String BACKUP_CONTEXT_PREFIX = "session.";
+  private final static String START_CODE_ROW = "startcode";
+  private final static String INCR_BACKUP_SET = "incrbackupset";
+  private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm.";
+  private final static String RS_LOG_TS_PREFIX = "rslogts.";
+  private final static String WALS_PREFIX = "wals.";
+
+  private final static byte[] q0 = "0".getBytes();
+  private final static byte[] EMPTY_VALUE = new byte[] {};
+
+  private BackupSystemTableHelper() {
+    throw new AssertionError("Instantiating utility class...");
+  }
+
+  /**
+   * Creates Put operation for a given backup context object
+   * @param context backup context
+   * @return put operation
+   * @throws IOException exception
+   */
+  static Put createPutForBackupContext(BackupContext context) throws IOException {
+
+    Put put = new Put((BACKUP_CONTEXT_PREFIX + context.getBackupId()).getBytes());
+    put.addColumn(BackupSystemTable.familyName, q0, context.toByteArray());
+    return put;
+  }
+
+  /**
+   * Creates Get operation for a given backup id
+   * @param backupId - backup's ID
+   * @return get operation
+   * @throws IOException exception
+   */
+  static Get createGetForBackupContext(String backupId) throws IOException {
+    Get get = new Get((BACKUP_CONTEXT_PREFIX + backupId).getBytes());
+    get.addFamily(BackupSystemTable.familyName);
+    get.setMaxVersions(1);
+    return get;
+  }
+
+  /**
+   * Creates Delete operation for a given backup id
+   * @param backupId - backup's ID
+   * @return delete operation
+   * @throws IOException exception
+   */
+  public static Delete createDeletForBackupContext(String backupId) {
+    Delete del = new Delete((BACKUP_CONTEXT_PREFIX + backupId).getBytes());
+    del.addFamily(BackupSystemTable.familyName);
+    return del;
+  }
+
+  /**
+   * Converts Result to BackupContext
+   * @param res - HBase result
+   * @return backup context instance
+   * @throws IOException exception
+   */
+  static BackupContext resultToBackupContext(Result res) throws IOException {
+    res.advance();
+    Cell cell = res.current();
+    return cellToBackupContext(cell);
+  }
+
+  /**
+   * Creates Get operation to retrieve start code from hbase:backup
+   * @return get operation
+   * @throws IOException exception
+   */
+  static Get createGetForStartCode() throws IOException {
+    Get get = new Get(START_CODE_ROW.getBytes());
+    get.addFamily(BackupSystemTable.familyName);
+    get.setMaxVersions(1);
+    return get;
+  }
+
+  /**
+   * Creates Put operation to store start code to hbase:backup
+   * @return put operation
+   * @throws IOException exception
+   */
+  static Put createPutForStartCode(String startCode) {
+    Put put = new Put(START_CODE_ROW.getBytes());
+    put.addColumn(BackupSystemTable.familyName, q0, startCode.getBytes());
+    return put;
+  }
+
+  /**
+   * Creates Get to retrieve incremental backup table set from hbase:backup
+   * @return get operation
+   * @throws IOException exception
+   */
+  static Get createGetForIncrBackupTableSet() throws IOException {
+    Get get = new Get(INCR_BACKUP_SET.getBytes());
+    get.addFamily(BackupSystemTable.familyName);
+    get.setMaxVersions(1);
+    return get;
+  }
+
+  /**
+   * Creates Put to store incremental backup table set
+   * @param tables tables
+   * @return put operation
+   */
+  static Put createPutForIncrBackupTableSet(Set<String> tables) {
+    Put put = new Put(INCR_BACKUP_SET.getBytes());
+    for (String table : tables) {
+      put.addColumn(BackupSystemTable.familyName, table.getBytes(), EMPTY_VALUE);
+    }
+    return put;
+  }
+
+  /**
+   * Creates Scan operation to load backup history
+   * @return scan operation
+   */
+  static Scan createScanForBackupHistory() {
+    Scan scan = new Scan();
+    byte[] startRow = BACKUP_CONTEXT_PREFIX.getBytes();
+    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
+    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
+    scan.setStartRow(startRow);
+    scan.setStopRow(stopRow);
+    scan.addFamily(BackupSystemTable.familyName);
+
+    return scan;
+  }
+
+  /**
+   * Converts cell to backup context instance.
+   * @param current - cell
+   * @return backup context instance
+   * @throws IOException exception
+   */
+  static BackupContext cellToBackupContext(Cell current) throws IOException {
+    byte[] data = CellUtil.cloneValue(current);
+    try {
+      BackupContext ctxt = BackupContext.fromByteArray(data);
+      return ctxt;
+    } catch (ClassNotFoundException e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Creates Put to write RS last roll log timestamp map
+   * @param table - table
+   * @param smap - map, containing RS:ts
+   * @return put operation
+   */
+  static Put createPutForWriteRegionServerLogTimestamp(String table, String smap) {
+    Put put = new Put((TABLE_RS_LOG_MAP_PREFIX + table).getBytes());
+    put.addColumn(BackupSystemTable.familyName, q0, smap.getBytes());
+    return put;
+  }
+
+  /**
+   * Creates Scan to load table-> { RS -> ts} map of maps
+   * @return scan operation
+   */
+  static Scan createScanForReadLogTimestampMap() {
+    Scan scan = new Scan();
+    byte[] startRow = TABLE_RS_LOG_MAP_PREFIX.getBytes();
+    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
+    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
+    scan.setStartRow(startRow);
+    scan.setStopRow(stopRow);
+    scan.addFamily(BackupSystemTable.familyName);
+
+    return scan;
+  }
+
+  /**
+   * Get table name from rowkey
+   * @param cloneRow rowkey
+   * @return table name
+   */
+  static String getTableNameForReadLogTimestampMap(byte[] cloneRow) {
+    int prefixSize = TABLE_RS_LOG_MAP_PREFIX.length();
+    return new String(cloneRow, prefixSize, cloneRow.length - prefixSize);
+  }
+
+  /**
+   * Creates Put to store RS last log result
+   * @param server - server name
+   * @param fileName - log roll result (timestamp)
+   * @return put operation
+   */
+  static Put createPutForRegionServerLastLogRollResult(String server, String fileName) {
+    Put put = new Put((RS_LOG_TS_PREFIX + server).getBytes());
+    put.addColumn(BackupSystemTable.familyName, q0, fileName.getBytes());
+    return put;
+  }
+
+  /**
+   * Creates Scan operation to load last RS log roll results
+   * @return scan operation
+   */
+  static Scan createScanForReadRegionServerLastLogRollResult() {
+    Scan scan = new Scan();
+    byte[] startRow = RS_LOG_TS_PREFIX.getBytes();
+    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
+    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
+    scan.setStartRow(startRow);
+    scan.setStopRow(stopRow);
+    scan.addFamily(BackupSystemTable.familyName);
+
+    return scan;
+  }
+
+  /**
+   * Get server's name from rowkey
+   * @param row - rowkey
+   * @return server's name
+   */
+  static String getServerNameForReadRegionServerLastLogRollResult(byte[] row) {
+    int prefixSize = RS_LOG_TS_PREFIX.length();
+    return new String(row, prefixSize, row.length - prefixSize);
+  }
+
+  /**
+   * Creates put list for list of WAL files
+   * @param files list of WAL file paths
+   * @param backupId backup id
+   * @return put list
+   * @throws IOException exception
+   */
+  public static List<Put> createPutsForAddWALFiles(List<String> files, String backupId)
+      throws IOException {
+
+    List<Put> puts = new ArrayList<Put>();
+    for (String file : files) {
+      LOG.debug("+++ put: " + BackupUtil.getUniqueWALFileNamePart(file));
+      byte[] row = (WALS_PREFIX + BackupUtil.getUniqueWALFileNamePart(file)).getBytes();
+      Put put = new Put(row);
+      put.addColumn(BackupSystemTable.familyName, q0, backupId.getBytes());
+      puts.add(put);
+    }
+    return puts;
+  }
+
+  /**
+   * Creates Get operation for a given wal file name
+   * @param file file
+   * @return get operation
+   * @throws IOException exception
+   */
+  public static Get createGetForCheckWALFile(String file) throws IOException {
+    byte[] row = (WALS_PREFIX + BackupUtil.getUniqueWALFileNamePart(file)).getBytes();
+    Get get = new Get(row);
+    get.addFamily(BackupSystemTable.familyName);
+    get.setMaxVersions(1);
+    return get;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/de69f0df/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupUtil.java
new file mode 100644
index 0000000..ff8bd2e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupUtil.java
@@ -0,0 +1,564 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.FSTableDescriptors;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+
+/**
+ * A collection for methods used by multiple classes to backup HBase tables.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public final class BackupUtil {
+  protected static final Log LOG = LogFactory.getLog(BackupUtil.class);
+
+  public static final String FIELD_SEPARATOR = "\001";
+  public static final String RECORD_SEPARATOR = "\002";
+  public static final String LOGNAME_SEPARATOR = ".";
+  protected static final String HDFS = "hdfs://";
+  protected static Configuration conf = null;
+
+  private BackupUtil(){
+    throw new AssertionError("Instantiating utility class...");
+  }
+
+  /**
+   * Set the configuration from a given one.
+   * @param newConf A new given configuration
+   */
+  public synchronized static void setConf(Configuration newConf) {
+    conf = newConf;
+  }
+
+  /**
+   * Get and merge Hadoop and HBase configuration.
+   * @throws IOException exception
+   */
+  protected static Configuration getConf() {
+    if (conf == null) {
+      conf = new Configuration();
+      HBaseConfiguration.merge(conf, HBaseConfiguration.create());
+    }
+    return conf;
+  }
+
+  /**
+   * Loop through the RS log timestamp map for the tables, for each RS, find the min timestamp value
+   * for the RS among the tables.
+   * @param rsLogTimestampMap timestamp map
+   * @return the min timestamp of each RS
+   */
+  protected static HashMap<String, String> getRSLogTimestampMins(
+    HashMap<String, HashMap<String, String>> rsLogTimestampMap) {
+
+    if (rsLogTimestampMap == null || rsLogTimestampMap.isEmpty()) {
+      return null;
+    }
+
+    HashMap<String, String> rsLogTimestamptMins = new HashMap<String, String>();
+    HashMap<String, HashMap<String, String>> rsLogTimestampMapByRS =
+        new HashMap<String, HashMap<String, String>>();
+
+    for (Entry<String, HashMap<String, String>> tableEntry : rsLogTimestampMap.entrySet()) {
+      String table = tableEntry.getKey();
+      HashMap<String, String> rsLogTimestamp = tableEntry.getValue();
+      for (Entry<String, String> rsEntry : rsLogTimestamp.entrySet()) {
+        String rs = rsEntry.getKey();
+        String ts = rsEntry.getValue();
+        if (!rsLogTimestampMapByRS.containsKey(rs)) {
+          rsLogTimestampMapByRS.put(rs, new HashMap<String, String>());
+          rsLogTimestampMapByRS.get(rs).put(table, ts);
+        } else {
+          rsLogTimestampMapByRS.get(rs).put(table, ts);
+        }
+      }
+    }
+
+    for (String rs : rsLogTimestampMapByRS.keySet()) {
+      rsLogTimestamptMins.put(rs, getMinValue(rsLogTimestampMapByRS.get(rs)));
+    }
+
+    return rsLogTimestamptMins;
+  }
+
+  /**
+   * Get the min value for all the Values a map.
+   * @param map map
+   * @return the min value
+   */
+  protected static String getMinValue(HashMap<String, String> map) {
+    String minTimestamp = null;
+    if (map != null) {
+      ArrayList<String> timestampList = new ArrayList<String>(map.values());
+      Collections.sort(timestampList, new Comparator<String>() {
+        @Override
+        public int compare(String s1, String s2) {
+          long l1 = Long.valueOf(s1);
+          long l2 = Long.valueOf(s2);
+          if (l1 > l2) {
+            return 1;
+          } else if (l1 < l2) {
+            return -1;
+          } else {
+            return 0;
+          }
+        }
+      });
+      // The min among all the RS log timestamps will be kept in ZK.
+      minTimestamp = timestampList.get(0);
+    }
+    return minTimestamp;
+  }
+
+  /**
+   * copy out Table RegionInfo into incremental backup image need to consider move this logic into
+   * HBackupFileSystem
+   * @param backupContext backup context
+   * @param conf configuration
+   * @throws IOException exception
+   * @throws InterruptedException exception
+   */
+  protected static void copyTableRegionInfo(BackupContext backupContext, Configuration conf)
+      throws IOException, InterruptedException {
+
+    Path rootDir = FSUtils.getRootDir(conf);
+    FileSystem fs = rootDir.getFileSystem(conf);
+
+    // for each table in the table set, copy out the table info and region info files in the correct
+    // directory structure
+    for (String table : backupContext.getTables()) {
+
+      LOG.debug("Attempting to copy table info for:" + table);
+      TableDescriptor orig =
+          FSTableDescriptors.getTableDescriptorFromFs(fs, rootDir, TableName.valueOf(table));
+
+      // write a copy of descriptor to the target directory
+      Path target = new Path(backupContext.getBackupStatus(table).getTargetDir());
+      FileSystem targetFs = target.getFileSystem(conf);
+      FSTableDescriptors descriptors =
+          new FSTableDescriptors(conf, targetFs, FSUtils.getRootDir(conf));
+      descriptors.createTableDescriptorForTableDirectory(target, orig, false);
+      LOG.debug("Finished copying tableinfo.");
+
+      HBaseAdmin hbadmin = null;
+      // TODO: optimize
+      Connection conn = null;
+      List<HRegionInfo> regions = null;
+      try {
+        conn = ConnectionFactory.createConnection(conf);
+        hbadmin = (HBaseAdmin) conn.getAdmin();
+        regions = hbadmin.getTableRegions(TableName.valueOf(table));
+      } catch (Exception e) {
+        throw new BackupException(e);
+      } finally {
+        if (hbadmin != null) {
+          hbadmin.close();
+        }
+        if(conn != null){
+          conn.close();
+        }
+      }
+
+      // For each region, write the region info to disk
+      LOG.debug("Starting to write region info for table " + table);
+      for (HRegionInfo regionInfo : regions) {
+        Path regionDir =
+            HRegion.getRegionDir(new Path(backupContext.getBackupStatus(table).getTargetDir()),
+              regionInfo);
+        regionDir =
+            new Path(backupContext.getBackupStatus(table).getTargetDir(), regionDir.getName());
+        writeRegioninfoOnFilesystem(conf, targetFs, regionDir, regionInfo);
+      }
+      LOG.debug("Finished writing region info for table " + table);
+    }
+  }
+
+  /**
+   * Write the .regioninfo file on-disk.
+   */
+  public static void writeRegioninfoOnFilesystem(final Configuration conf, final FileSystem fs,
+      final Path regionInfoDir, HRegionInfo regionInfo) throws IOException {
+    final byte[] content = regionInfo.toDelimitedByteArray();
+    Path regionInfoFile = new Path(regionInfoDir, ".regioninfo");
+    // First check to get the permissions
+    FsPermission perms = FSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY);
+    // Write the RegionInfo file content
+    FSDataOutputStream out = FSUtils.create(conf, fs, regionInfoFile, perms, null);
+    try {
+      out.write(content);
+    } finally {
+      out.close();
+    }
+  }
+
+  /**
+   * TODO: verify the code
+   * @param p path
+   * @return host name
+   * @throws IOException exception
+   */
+  protected static String parseHostFromOldLog(Path p) throws IOException {
+    String n = p.getName();
+    int idx = n.lastIndexOf(LOGNAME_SEPARATOR);
+    String s = URLDecoder.decode(n.substring(0, idx), "UTF8");
+    return ServerName.parseHostname(s);
+  }
+
+  public static String parseHostNameFromLogFile(Path p) throws IOException {
+    if (isArchivedLogFile(p)) {
+      return parseHostFromOldLog(p);
+    } else {
+      return DefaultWALProvider.getServerNameFromWALDirectoryName(p).getHostname();
+    }
+  }
+
+  private static boolean isArchivedLogFile(Path p) {
+    String oldLog = Path.SEPARATOR + HConstants.HREGION_OLDLOGDIR_NAME + Path.SEPARATOR;
+    return p.toString().contains(oldLog);
+  }
+
+  /**
+   * Return WAL file name
+   * @param walFileName WAL file name
+   * @return WAL file name
+   * @throws IOException exception
+   * @throws IllegalArgumentException exception
+   */
+  public static String getUniqueWALFileNamePart(String walFileName) throws IOException {
+    return new Path(walFileName).getName();
+  }
+
+  /**
+   * Return WAL file name
+   * @param p - WAL file path
+   * @return WAL file name
+   * @throws IOException exception
+   */
+  public static String getUniqueWALFileNamePart(Path p) throws IOException {
+    return p.getName();
+  }
+
+  /**
+   * Given the log file, parse the timestamp from the file name. The timestamp is the last number.
+   * @param p a path to the log file
+   * @return the timestamp
+   * @throws IOException exception
+   */
+  protected static String getCreationTime(Path p, Configuration conf) throws IOException {
+    int idx = p.getName().lastIndexOf(LOGNAME_SEPARATOR);
+    if (idx < 0) {
+      throw new IOException("Cannot parse timestamp from path " + p);
+    }
+    String ts = p.getName().substring(idx + 1);
+    return ts;
+  }
+
+  /**
+   * Get the total length of files under the given directory recursively.
+   * @param fs The hadoop file system
+   * @param dir The target directory
+   * @return the total length of files
+   * @throws IOException exception
+   */
+  public static long getFilesLength(FileSystem fs, Path dir) throws IOException {
+    long totalLength = 0;
+    FileStatus[] files = FSUtils.listStatus(fs, dir);
+    if (files != null) {
+      for (FileStatus fileStatus : files) {
+        if (fileStatus.isDir()) {
+          totalLength += getFilesLength(fs, fileStatus.getPath());
+        } else {
+          totalLength += fileStatus.getLen();
+        }
+      }
+    }
+    return totalLength;
+  }
+
+  /**
+   * Keep the record for dependency for incremental backup and history info p.s, we may be able to
+   * merge this class into backupImage class later
+   */
+  public static class BackupCompleteData implements Comparable<BackupCompleteData> {
+    private String startTime;
+    private String endTime;
+    private String type;
+    private String backupRootPath;
+    private String tableList;
+    private String backupToken;
+    private String bytesCopied;
+    private List<String> ancestors;
+    private boolean fromExistingSnapshot = false;
+
+    public List<String> getAncestors() {
+      if (fromExistingSnapshot) {
+        return null;
+      }
+      if (this.ancestors == null) {
+        this.ancestors = new ArrayList<String>();
+      }
+      return this.ancestors;
+    }
+
+    public void addAncestor(String backupToken) {
+      this.getAncestors().add(backupToken);
+    }
+
+    public String getBytesCopied() {
+      return bytesCopied;
+    }
+
+    public void setBytesCopied(String bytesCopied) {
+      this.bytesCopied = bytesCopied;
+    }
+
+    public String getBackupToken() {
+      return backupToken;
+    }
+
+    public void setBackupToken(String backupToken) {
+      this.backupToken = backupToken;
+    }
+
+    public String getStartTime() {
+      return startTime;
+    }
+
+    public void setStartTime(String startTime) {
+      this.startTime = startTime;
+    }
+
+    public String getEndTime() {
+      return endTime;
+    }
+
+    public void setEndTime(String endTime) {
+      this.endTime = endTime;
+    }
+
+    public String getType() {
+      return type;
+    }
+
+    public void setType(String type) {
+      this.type = type;
+    }
+
+    public String getBackupRootPath() {
+      return backupRootPath;
+    }
+
+    public void setBackupRootPath(String backupRootPath) {
+      this.backupRootPath = backupRootPath;
+    }
+
+    public String getTableList() {
+      return tableList;
+    }
+
+    public void setTableList(String tableList) {
+      this.tableList = tableList;
+    }
+
+    public boolean fromExistingSnapshot() {
+      return this.fromExistingSnapshot;
+    }
+
+    public void markFromExistingSnapshot() {
+      this.fromExistingSnapshot = true;
+    }
+
+    @Override
+    public int compareTo(BackupCompleteData o) {
+      Long thisTS =
+          new Long(this.getBackupToken().substring(this.getBackupToken().lastIndexOf("_") + 1));
+      Long otherTS =
+          new Long(o.getBackupToken().substring(o.getBackupToken().lastIndexOf("_") + 1));
+      return thisTS.compareTo(otherTS);
+    }
+
+  }
+
+  /**
+   * Sort history list by start time in descending order.
+   * @param historyList history list
+   * @return sorted list of BackupCompleteData
+   */
+  public static ArrayList<BackupCompleteData> sortHistoryListDesc(
+    ArrayList<BackupCompleteData> historyList) {
+    ArrayList<BackupCompleteData> list = new ArrayList<BackupCompleteData>();
+    TreeMap<String, BackupCompleteData> map = new TreeMap<String, BackupCompleteData>();
+    for (BackupCompleteData h : historyList) {
+      map.put(h.getStartTime(), h);
+    }
+    Iterator<String> i = map.descendingKeySet().iterator();
+    while (i.hasNext()) {
+      list.add(map.get(i.next()));
+    }
+    return list;
+  }
+
+  /**
+   * Get list of all WAL files (WALs and archive)
+   * @param c - configuration
+   * @return list of WAL files
+   * @throws IOException exception
+   */
+  public static List<String> getListOfWALFiles(Configuration c) throws IOException {
+    Path rootDir = FSUtils.getRootDir(c);
+    Path logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
+    Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+    List<String> logFiles = new ArrayList<String>();
+
+    FileSystem fs = FileSystem.get(c);
+    logFiles = getFiles(fs, logDir, logFiles, null);
+    logFiles = getFiles(fs, oldLogDir, logFiles, null);
+    return logFiles;
+  }
+
+  /**
+   * Get list of all WAL files (WALs and archive)
+   * @param c - configuration
+   * @return list of WAL files
+   * @throws IOException exception
+   */
+  public static List<String> getListOfWALFiles(Configuration c, PathFilter filter)
+      throws IOException {
+    Path rootDir = FSUtils.getRootDir(c);
+    Path logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
+    Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+    List<String> logFiles = new ArrayList<String>();
+
+    FileSystem fs = FileSystem.get(c);
+    logFiles = getFiles(fs, logDir, logFiles, filter);
+    logFiles = getFiles(fs, oldLogDir, logFiles, filter);
+    return logFiles;
+  }
+
+  /**
+   * Get list of all old WAL files (WALs and archive)
+   * @param c - configuration
+   * @return list of WAL files
+   * @throws IOException exception
+   */
+  public static List<String> getWALFilesOlderThan(final Configuration c,
+    final HashMap<String, String> hostTimestampMap) throws IOException {
+    Path rootDir = FSUtils.getRootDir(c);
+    Path logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
+    Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+    List<String> logFiles = new ArrayList<String>();
+
+    PathFilter filter = new PathFilter() {
+
+      @Override
+      public boolean accept(Path p) {
+        try {
+          if (DefaultWALProvider.isMetaFile(p)) {
+            return false;
+          }
+          String host = BackupUtil.parseHostNameFromLogFile(p);
+          String oldTimestamp = hostTimestampMap.get(host);
+          String currentLogTS = getCreationTime(p, c);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("path=" + p);
+            LOG.debug("oldTimestamp=" + oldTimestamp);
+            LOG.debug("currentLogTS=" + currentLogTS);
+          }
+          return Long.parseLong(currentLogTS) <= Long.parseLong(oldTimestamp);
+        } catch (IOException e) {
+          LOG.error(e);
+          return false;
+        }
+      }
+    };
+    FileSystem fs = FileSystem.get(c);
+    logFiles = getFiles(fs, logDir, logFiles, filter);
+    logFiles = getFiles(fs, oldLogDir, logFiles, filter);
+    return logFiles;
+  }
+
+  private static List<String> getFiles(FileSystem fs, Path rootDir, List<String> files,
+    PathFilter filter) throws FileNotFoundException, IOException {
+
+    RemoteIterator<LocatedFileStatus> it = fs.listFiles(rootDir, true);
+
+    while (it.hasNext()) {
+      LocatedFileStatus lfs = it.next();
+      if (lfs.isDirectory()) {
+        continue;
+      }
+      // apply filter
+      if (filter.accept(lfs.getPath())) {
+        files.add(lfs.getPath().toString());
+        LOG.info(lfs.getPath());
+      }
+    }
+    return files;
+  }
+
+  public static String concat(Collection<String> col, String separator) {
+    if (col.size() == 0) {
+      return "";
+    }
+    StringBuilder sb = new StringBuilder();
+    for (String s : col) {
+      sb.append(s + separator);
+    }
+    sb.deleteCharAt(sb.lastIndexOf(";"));
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/de69f0df/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java
new file mode 100644
index 0000000..74411da
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java
@@ -0,0 +1,511 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
+import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * View to an on-disk Backup Image FileSytem
+ * Provides the set of methods necessary to interact with the on-disk Backup Image data.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class HBackupFileSystem {
+  public static final Log LOG = LogFactory.getLog(HBackupFileSystem.class);
+
+  private final String RESTORE_TMP_PATH = "/tmp/restoreTemp";
+  private final String[] ignoreDirs = { "recovered.edits" };
+
+  private final Configuration conf;
+  private final FileSystem fs;
+  private final Path backupRootPath;
+  private final String backupId;
+
+  /**
+   * Create a view to the on-disk Backup Image. 
+   * @param conf  to use
+   * @param backupPath  to where the backup Image stored
+   * @param backupId represent backup Image
+   */
+  HBackupFileSystem(final Configuration conf, final Path backupRootPath, final String backupId)
+      throws IOException {
+    this.conf = conf;
+    this.fs = backupRootPath.getFileSystem(conf);
+    this.backupRootPath = backupRootPath;
+    this.backupId = backupId; // the backup ID for the lead backup Image
+  }
+
+  /**
+   * @param tableName is the table backuped
+   * @return {@link HTableDescriptor} saved in backup image of the table
+   */
+  protected HTableDescriptor getTableDesc(String tableName) throws FileNotFoundException,
+  IOException {
+
+    Path tableInfoPath = this.getTableInfoPath(tableName);
+    LOG.debug("tableInfoPath = " + tableInfoPath.toString());
+    SnapshotDescription desc = SnapshotDescriptionUtils.readSnapshotInfo(fs, tableInfoPath);
+    LOG.debug("desc = " + desc.getName());
+    SnapshotManifest manifest = SnapshotManifest.open(conf, fs, tableInfoPath, desc);
+    HTableDescriptor tableDescriptor = manifest.getTableDescriptor();
+    /*
+     * for HBase 0.96 or 0.98 HTableDescriptor tableDescriptor =
+     * FSTableDescriptors.getTableDescriptorFromFs(fs, tableInfoPath);
+     */
+    if (!tableDescriptor.getNameAsString().equals(tableName)) {
+      LOG.error("couldn't find Table Desc for table: " + tableName + " under tableInfoPath: "
+          + tableInfoPath.toString());
+      LOG.error("tableDescriptor.getNameAsString() = " + tableDescriptor.getNameAsString());
+    }
+    return tableDescriptor;
+  }
+
+  /**
+   * Given the backup root dir, backup id and the table name, return the backup image location,
+   * which is also where the backup manifest file is. return value look like:
+   * "hdfs://backup.hbase.org:9000/user/biadmin/backup1/default/t1_dn/backup_1396650096738"
+   * @param backupRootDir backup root directory
+   * @param backupId  backup id
+   * @param table table name
+   * @return backupPath String for the particular table
+   */
+  protected static String getTableBackupDir(String backupRootDir, String backupId, String table) {
+    TableName tableName = TableName.valueOf(table);
+    return backupRootDir + File.separator + tableName.getNamespaceAsString() + File.separator
+        + tableName.getQualifierAsString() + File.separator + backupId;
+  }
+
+  /**
+   * Given the backup root dir, backup id and the table name, return the backup image location,
+   * which is also where the backup manifest file is. return value look like:
+   * "hdfs://backup.hbase.org:9000/user/biadmin/backup1/default/t1_dn/backup_1396650096738"
+   * @param tableN table name
+   * @return backupPath for the particular table
+   */
+  protected Path getTableBackupPath(String tableN) {
+    TableName tableName = TableName.valueOf(tableN);
+    return new Path(this.backupRootPath, tableName.getNamespaceAsString() + File.separator
+      + tableName.getQualifierAsString() + File.separator + backupId);
+  }
+
+  /**
+   * return value represent path for:
+   * ".../user/biadmin/backup1/default/t1_dn/backup_1396650096738/.hbase-snapshot"
+   * @param tableName table name
+   * @return path for snapshot
+   */
+  protected Path getTableSnapshotPath(String tableName) {
+    return new Path(this.getTableBackupPath(tableName), HConstants.SNAPSHOT_DIR_NAME);
+  }
+
+  /**
+   * return value represent path for:
+   * "..../default/t1_dn/backup_1396650096738/.hbase-snapshot/snapshot_1396650097621_default_t1_dn"
+   * this path contains .snapshotinfo, .tabledesc (0.96 and 0.98) this path contains .snapshotinfo,
+   * .data.manifest (trunk)
+   * @param tableName table name
+   * @return path to table info
+   * @throws FileNotFoundException exception
+   * @throws IOException exception
+   */
+  protected Path getTableInfoPath(String tableName) throws FileNotFoundException, IOException {
+
+    Path tableSnapShotPath = this.getTableSnapshotPath(tableName);
+    Path tableInfoPath = null;
+
+    // can't build the path directly as the timestamp values are different
+    FileStatus[] snapshots = fs.listStatus(tableSnapShotPath);
+    for (FileStatus snapshot : snapshots) {
+      tableInfoPath = snapshot.getPath();
+      // SnapshotManifest.DATA_MANIFEST_NAME = "data.manifest";
+      if (tableInfoPath.getName().endsWith("data.manifest")) {
+        LOG.debug("find Snapshot Manifest");
+        break;
+      }
+    }
+    return tableInfoPath;
+  }
+
+  /**
+   * return value represent path for:
+   * ".../user/biadmin/backup1/default/t1_dn/backup_1396650096738/archive/data/default/t1_dn"
+   * @param tabelName table name
+   * @return path to table archive
+   * @throws IOException exception
+   */
+  protected Path getTableArchivePath(String tableName) throws IOException {
+    Path baseDir = new Path(getTableBackupPath(tableName), HConstants.HFILE_ARCHIVE_DIRECTORY);
+    Path dataDir = new Path(baseDir, HConstants.BASE_NAMESPACE_DIR);
+    Path archivePath = new Path(dataDir, TableName.valueOf(tableName).getNamespaceAsString());
+    Path tableArchivePath =
+        new Path(archivePath, TableName.valueOf(tableName).getQualifierAsString());
+    if (!fs.exists(tableArchivePath) || !fs.getFileStatus(tableArchivePath).isDirectory()) {
+      LOG.debug("Folder tableArchivePath: " + tableArchivePath.toString() + " does not exists");
+      tableArchivePath = null; // empty table has no archive
+    }
+    return tableArchivePath;
+  }
+
+  /**
+   * Given the backup root dir and the backup id, return the log file location for an incremental
+   * backup.
+   * @param backupRootDir backup root directory
+   * @param backupId backup id
+   * @return logBackupDir: ".../user/biadmin/backup1/WALs/backup_1396650096738"
+   */
+  protected static String getLogBackupDir(String backupRootDir, String backupId) {
+    return backupRootDir + File.separator + HConstants.HREGION_LOGDIR_NAME + File.separator
+        + backupId;
+  }
+
+  protected static Path getLogBackupPath(String backupRootDir, String backupId) {
+    return new Path(getLogBackupDir(backupRootDir, backupId));
+  }
+
+  private Path getManifestPath(String tableName) throws IOException {
+    Path manifestPath = new Path(getTableBackupPath(tableName), BackupManifest.FILE_NAME);
+
+    LOG.debug("Looking for " + manifestPath.toString());
+    if (!fs.exists(manifestPath)) {
+      // check log dir for incremental backup case
+      manifestPath =
+          new Path(getLogBackupDir(this.backupRootPath.toString(), this.backupId) + File.separator
+            + BackupManifest.FILE_NAME);
+      LOG.debug("Looking for " + manifestPath.toString());
+      if (!fs.exists(manifestPath)) {
+        String errorMsg =
+            "Could not find backup manifest for " + backupId + " in " + backupRootPath.toString();
+        throw new IOException(errorMsg);
+      }
+    }
+    return manifestPath;
+  }
+
+  protected BackupManifest getManifest(String tableName) throws IOException {
+    BackupManifest manifest = new BackupManifest(conf, this.getManifestPath(tableName));
+    return manifest;
+  }
+
+  /**
+   * Gets region list
+   * @param tableName table name
+   * @return RegionList region list
+   * @throws FileNotFoundException exception
+   * @throws IOException exception
+   */
+
+  protected ArrayList<Path> getRegionList(String tableName) throws FileNotFoundException,
+  IOException {
+    Path tableArchivePath = this.getTableArchivePath(tableName);
+    ArrayList<Path> regionDirList = new ArrayList<Path>();
+    FileStatus[] children = fs.listStatus(tableArchivePath);
+    for (FileStatus childStatus : children) {
+      // here child refer to each region(Name)
+      Path child = childStatus.getPath();
+      regionDirList.add(child);
+    }
+    return regionDirList;
+  }
+
+  /**
+   * Gets region list
+   * @param tableArchivePath table archive path
+   * @return RegionList region list
+   * @throws FileNotFoundException exception
+   * @throws IOException exception
+   */
+  protected ArrayList<Path> getRegionList(Path tableArchivePath) throws FileNotFoundException,
+  IOException {
+    ArrayList<Path> regionDirList = new ArrayList<Path>();
+    FileStatus[] children = fs.listStatus(tableArchivePath);
+    for (FileStatus childStatus : children) {
+      // here child refer to each region(Name)
+      Path child = childStatus.getPath();
+      regionDirList.add(child);
+    }
+    return regionDirList;
+  }
+
+  /**
+   * Counts the number of files in all subdirectories of an HBase tables, i.e. HFiles. And finds the
+   * maximum number of files in one HBase table.
+   * @param tableArchivePath archive path
+   * @return the maximum number of files found in 1 HBase table
+   * @throws IOException exception
+   */
+  protected int getMaxNumberOfFilesInSubDir(Path tableArchivePath) throws IOException {
+    int result = 1;
+    ArrayList<Path> regionPathList = this.getRegionList(tableArchivePath);
+    // tableArchivePath = this.getTableArchivePath(tableName);
+
+    if (regionPathList == null || regionPathList.size() == 0) {
+      throw new IllegalStateException("Cannot restore hbase table because directory '"
+          + tableArchivePath + "' is not a directory.");
+    }
+
+    for (Path regionPath : regionPathList) {
+      result = Math.max(result, getNumberOfFilesInDir(regionPath));
+    }
+    return result;
+  }
+
+  /**
+   * Counts the number of files in all subdirectories of an HBase table, i.e. HFiles.
+   * @param regionPath Path to an HBase table directory
+   * @return the number of files all directories
+   * @throws IOException exception
+   */
+  protected int getNumberOfFilesInDir(Path regionPath) throws IOException {
+    int result = 0;
+
+    if (!fs.exists(regionPath) || !fs.getFileStatus(regionPath).isDirectory()) {
+      throw new IllegalStateException("Cannot restore hbase table because directory '"
+          + regionPath.toString() + "' is not a directory.");
+    }
+
+    FileStatus[] tableDirContent = fs.listStatus(regionPath);
+    for (FileStatus subDirStatus : tableDirContent) {
+      FileStatus[] colFamilies = fs.listStatus(subDirStatus.getPath());
+      for (FileStatus colFamilyStatus : colFamilies) {
+        FileStatus[] colFamilyContent = fs.listStatus(colFamilyStatus.getPath());
+        result += colFamilyContent.length;
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Duplicate the backup image if it's on local cluster
+   * @see HStore#bulkLoadHFile(String, long)
+   * @see HRegionFileSystem#bulkLoadStoreFile(String familyName, Path srcPath, long seqNum)
+   * @param tableArchivePath archive path
+   * @return the new tableArchivePath 
+   * @throws IOException exception
+   */
+  protected Path checkLocalAndBackup(Path tableArchivePath) throws IOException {
+    // Move the file if it's on local cluster
+    boolean isCopyNeeded = false;
+
+    FileSystem srcFs = tableArchivePath.getFileSystem(conf);
+    FileSystem desFs = FileSystem.get(conf);
+    if (tableArchivePath.getName().startsWith("/")) {
+      isCopyNeeded = true;
+    } else {
+      // This should match what is done in @see HRegionFileSystem#bulkLoadStoreFile(String, Path,
+      // long)
+      if (srcFs.getUri().equals(desFs.getUri())) {
+        LOG.debug("cluster hold the backup image: " + srcFs.getUri() + "; local cluster node: "
+            + desFs.getUri());
+        isCopyNeeded = true;
+      }
+    }
+    if (isCopyNeeded) {
+      LOG.debug("File " + tableArchivePath + " on local cluster, back it up before restore");
+      Path tmpPath = new Path(RESTORE_TMP_PATH);
+      if (desFs.exists(tmpPath)) {
+        try {
+          desFs.delete(tmpPath, true);
+        } catch (IOException e) {
+          LOG.debug("Failed to delete path: " + tmpPath
+            + ", need to check whether restore target DFS cluster is healthy");
+        }
+      }
+      FileUtil.copy(srcFs, tableArchivePath, desFs, tmpPath, false, conf);
+      LOG.debug("Copied to temporary path on local cluster: " + tmpPath);
+      tableArchivePath = tmpPath;
+    }
+    return tableArchivePath;
+  }
+
+  /**
+   * Calculate region boundaries and add all the column families to the table descriptor
+   * @param regionDirList region dir list
+   * @return a set of keys to store the boundaries
+   */
+  protected byte[][] generateBoundaryKeys(ArrayList<Path> regionDirList)
+      throws FileNotFoundException, IOException {
+    TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+    // Build a set of keys to store the boundaries
+    byte[][] keys = null;
+    // calculate region boundaries and add all the column families to the table descriptor
+    for (Path regionDir : regionDirList) {
+      LOG.debug("Parsing region dir: " + regionDir);
+      Path hfofDir = regionDir;
+
+      if (!fs.exists(hfofDir)) {
+        LOG.warn("HFileOutputFormat dir " + hfofDir + " not found");
+      }
+
+      FileStatus[] familyDirStatuses = fs.listStatus(hfofDir);
+      if (familyDirStatuses == null) {
+        throw new IOException("No families found in " + hfofDir);
+      }
+
+      for (FileStatus stat : familyDirStatuses) {
+        if (!stat.isDirectory()) {
+          LOG.warn("Skipping non-directory " + stat.getPath());
+          continue;
+        }
+        boolean isIgnore = false;
+        String pathName = stat.getPath().getName();
+        for (String ignore : ignoreDirs) {
+          if (pathName.contains(ignore)) {
+            LOG.warn("Skipping non-family directory" + pathName);
+            isIgnore = true;
+            break;
+          }
+        }
+        if (isIgnore) {
+          continue;
+        }
+        Path familyDir = stat.getPath();
+        LOG.debug("Parsing family dir [" + familyDir.toString() + " in region [" + regionDir + "]");
+        // Skip _logs, etc
+        if (familyDir.getName().startsWith("_") || familyDir.getName().startsWith(".")) {
+          continue;
+        }
+
+        // start to parse hfile inside one family dir
+        Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir));
+        for (Path hfile : hfiles) {
+          if (hfile.getName().startsWith("_") || hfile.getName().startsWith(".")
+              || StoreFileInfo.isReference(hfile.getName())
+              || HFileLink.isHFileLink(hfile.getName())) {
+            continue;
+          }
+          HFile.Reader reader = HFile.createReader(fs, hfile, new CacheConfig(conf), conf);
+          final byte[] first, last;
+          try {
+            reader.loadFileInfo();
+            first = reader.getFirstRowKey();
+            last = reader.getLastRowKey();
+            LOG.debug("Trying to figure out region boundaries hfile=" + hfile + " first="
+                + Bytes.toStringBinary(first) + " last=" + Bytes.toStringBinary(last));
+
+            // To eventually infer start key-end key boundaries
+            Integer value = map.containsKey(first) ? (Integer) map.get(first) : 0;
+            map.put(first, value + 1);
+            value = map.containsKey(last) ? (Integer) map.get(last) : 0;
+            map.put(last, value - 1);
+          } finally {
+            reader.close();
+          }
+        }
+      }
+    }
+    keys = LoadIncrementalHFiles.inferBoundaries(map);
+    return keys;
+  }
+
+  /**
+   * Check whether the backup path exist
+   * @param backupStr backup
+   * @param conf configuration
+   * @return Yes if path exists
+   * @throws IOException exception
+   */
+  protected static boolean checkPathExist(String backupStr, Configuration conf) 
+    throws IOException {
+    boolean isExist = false;
+    Path backupPath = new Path(backupStr);
+    FileSystem fileSys = backupPath.getFileSystem(conf);
+    String targetFsScheme = fileSys.getUri().getScheme();
+    LOG.debug("Schema of given url: " + backupStr + " is: " + targetFsScheme);
+    if (fileSys.exists(backupPath)) {
+      isExist = true;
+    }
+    return isExist;
+  }
+
+  /**
+   * Check whether the backup image path and there is manifest file in the path.
+   * @param backupManifestMap If all the manifests are found, then they are put into this map
+   * @param tableArray the tables involved
+   * @throws IOException exception
+   */
+  protected void checkImageManifestExist(HashMap<String, BackupManifest> backupManifestMap,
+      String[] tableArray) throws IOException {
+
+    try {
+      for (String tableName : tableArray) {
+        BackupManifest manifest = this.getManifest(tableName);
+        backupManifestMap.put(tableName, manifest);
+      }
+    } catch (IOException e) {
+      String expMsg = e.getMessage();
+      if (expMsg.contains("No FileSystem for scheme")) {
+        if (expMsg.contains("gpfs")) {
+          LOG.error("Please change to use webhdfs url when "
+              + "the backup image to restore locates on gpfs cluster");
+        } else {
+          LOG.error("Unsupported filesystem scheme found in the backup target url, "
+              + "please check the url to make sure no typo in it");
+        }
+        throw e;
+      } else if (expMsg.contains("no authority supported")) {
+        LOG.error("Please change to use webhdfs url when "
+            + "the backup image to restore locates on gpfs cluster");
+        throw e;
+      } else {
+        LOG.error(expMsg);
+        throw e;
+      }
+    }
+  }
+
+  public static String join(String[] names) {
+    StringBuilder sb = new StringBuilder();
+    String sep = BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND;
+    for (String s : names) {
+      sb.append(sep).append(s);
+    }
+    return sb.toString();
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/de69f0df/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/IncrementalBackupManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/IncrementalBackupManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/IncrementalBackupManager.java
new file mode 100644
index 0000000..e91857f
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/IncrementalBackupManager.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+
+/**
+ * After a full backup was created, the incremental backup will only store the changes made
+ * after the last full or incremental backup.
+ *
+ * Creating the backup copies the logfiles in .logs and .oldlogs since the last backup timestamp.
+ *
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class IncrementalBackupManager {
+  // parent manager
+  private BackupManager backupManager;
+
+  public static final Log LOG = LogFactory.getLog(IncrementalBackupManager.class);
+
+  public IncrementalBackupManager(BackupManager bm) {
+    this.backupManager = bm;
+  }
+
+  /**
+   * Obtain the list of logs that need to be copied out for this incremental backup. The list is set
+   * in BackupContext.
+   * @param backupContext backup context
+   * @return The new HashMap of RS log timestamps after the log roll for this incremental backup.
+   * @throws IOException exception
+   */
+  public HashMap<String, String> getIncrBackupLogFileList(BackupContext backupContext)
+      throws IOException {
+    List<String> logList;
+    HashMap<String, String> newTimestamps;
+    HashMap<String, String> previousTimestampMins;
+
+    Configuration conf = BackupUtil.getConf();
+    String savedStartCode = backupManager.readBackupStartCode();
+
+    // key: tableName
+    // value: <RegionServer,PreviousTimeStamp>
+    HashMap<String, HashMap<String, String>> previousTimestampMap =
+        backupManager.readLogTimestampMap();
+
+    previousTimestampMins = BackupUtil.getRSLogTimestampMins(previousTimestampMap);
+
+    LOG.debug("StartCode " + savedStartCode + "for backupID " + backupContext.getBackupId());
+    LOG.debug("Timestamps " + previousTimestampMap);
+    // get all new log files from .logs and .oldlogs after last TS and before new timestamp
+    if (savedStartCode == null || 
+        previousTimestampMins == null || 
+          previousTimestampMins.isEmpty()) {
+      throw new IOException("Cannot read any previous back up timestamps from hbase:backup. "
+          + "In order to create an incremental backup, at least one full backup is needed.");
+    }
+
+    HBaseAdmin hbadmin = null;
+    Connection conn = null;
+    try {
+      LOG.info("Execute roll log procedure for incremental backup ...");
+      conn = ConnectionFactory.createConnection(conf);
+      hbadmin = (HBaseAdmin) conn.getAdmin();
+      hbadmin.execProcedure(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE,
+        LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, new HashMap<String, String>());
+    } finally {
+      if (hbadmin != null) {
+        hbadmin.close();
+      }
+      if(conn != null){
+        conn.close();
+      }
+    }
+
+    newTimestamps = backupManager.readRegionServerLastLogRollResult();
+
+    logList = getLogFilesForNewBackup(previousTimestampMins, newTimestamps, conf, savedStartCode);
+
+    backupContext.setIncrBackupFileList(logList);
+
+    return newTimestamps;
+  }
+
+  /**
+   * For each region server: get all log files newer than the last timestamps but not newer than the
+   * newest timestamps.
+   * @param olderTimestamps the timestamp for each region server of the last backup.
+   * @param newestTimestamps the timestamp for each region server that the backup should lead to.
+   * @param conf the Hadoop and Hbase configuration
+   * @param savedStartCode the startcode (timestamp) of last successful backup.
+   * @return a list of log files to be backed up
+   * @throws IOException exception
+   */
+  private List<String> getLogFilesForNewBackup(HashMap<String, String> olderTimestamps,
+    HashMap<String, String> newestTimestamps, Configuration conf, String savedStartCode)
+        throws IOException {
+    LOG.debug("In getLogFilesForNewBackup()\n" + "olderTimestamps: " + olderTimestamps
+      + "\n newestTimestamps: " + newestTimestamps);
+    Path rootdir = FSUtils.getRootDir(conf);
+    Path logDir = new Path(rootdir, HConstants.HREGION_LOGDIR_NAME);
+    Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
+    FileSystem fs = rootdir.getFileSystem(conf);
+    NewestLogFilter pathFilter = new NewestLogFilter(conf);
+
+    List<String> resultLogFiles = new ArrayList<String>();
+    List<String> newestLogs = new ArrayList<String>();
+
+    /*
+     * The old region servers and timestamps info we kept in hbase:backup may be out of sync if new
+     * region server is added or existing one lost. We'll deal with it here when processing the
+     * logs. If data in hbase:backup has more hosts, just ignore it. If the .logs directory includes
+     * more hosts, the additional hosts will not have old timestamps to compare with. We'll just use
+     * all the logs in that directory. We always write up-to-date region server and timestamp info
+     * to hbase:backup at the end of successful backup.
+     */
+
+    FileStatus[] rss;
+    Path p;
+    String host;
+    String oldTimeStamp;
+    String currentLogFile;
+    String currentLogTS;
+
+    // Get the files in .logs.
+    rss = fs.listStatus(logDir);
+    for (FileStatus rs : rss) {
+      p = rs.getPath();
+      host = DefaultWALProvider.getServerNameFromWALDirectoryName(p).getHostname();
+      FileStatus[] logs;
+      oldTimeStamp = olderTimestamps.get(host);
+      // It is possible that there is no old timestamp in hbase:backup for this host if
+      // this region server is newly added after our last backup.
+      if (oldTimeStamp == null) {
+        logs = fs.listStatus(p);
+      } else {
+        pathFilter.setLastBackupTS(oldTimeStamp);
+        logs = fs.listStatus(p, pathFilter);
+      }
+      for (FileStatus log : logs) {
+        LOG.debug("currentLogFile: " + log.getPath().toString());
+        if (DefaultWALProvider.isMetaFile(log.getPath())) {
+          LOG.debug("Skip hbase:meta log file: " + log.getPath().getName());
+          continue;
+        }
+        currentLogFile = log.getPath().toString();
+        resultLogFiles.add(currentLogFile);
+        currentLogTS = BackupUtil.getCreationTime(log.getPath(), conf);
+        // newestTimestamps is up-to-date with the current list of hosts
+        // so newestTimestamps.get(host) will not be null.
+        if (Long.valueOf(currentLogTS) > Long.valueOf(newestTimestamps.get(host))) {
+          newestLogs.add(currentLogFile);
+        }
+      }
+    }
+
+    // Include the .oldlogs files too.
+    FileStatus[] oldlogs = fs.listStatus(oldLogDir);
+    for (FileStatus oldlog : oldlogs) {
+      p = oldlog.getPath();
+      currentLogFile = p.toString();
+      if (DefaultWALProvider.isMetaFile(p)) {
+        LOG.debug("Skip .meta log file: " + currentLogFile);
+        continue;
+      }
+      host = BackupUtil.parseHostFromOldLog(p);
+      currentLogTS = BackupUtil.getCreationTime(p, conf);
+      oldTimeStamp = olderTimestamps.get(host);
+      /*
+       * It is possible that there is no old timestamp in hbase:backup for this host. At the time of
+       * our last backup operation, this rs did not exist. The reason can be one of the two: 1. The
+       * rs already left/crashed. Its logs were moved to .oldlogs. 2. The rs was added after our
+       * last backup.
+       */
+      if (oldTimeStamp == null) {
+        if (Long.valueOf(currentLogTS) < Long.valueOf(savedStartCode)) {
+          // This log file is really old, its region server was before our last backup.
+          continue;
+        } else {
+          resultLogFiles.add(currentLogFile);
+        }
+      } else if (Long.valueOf(currentLogTS) > Long.valueOf(oldTimeStamp)) {
+        resultLogFiles.add(currentLogFile);
+      }
+
+      LOG.debug("resultLogFiles before removal of newestLogs: " + resultLogFiles);
+      // It is possible that a host in .oldlogs is an obsolete region server
+      // so newestTimestamps.get(host) here can be null.
+      // Even if these logs belong to a obsolete region server, we still need
+      // to include they to avoid loss of edits for backup.
+      String newTimestamp = newestTimestamps.get(host);
+      if (newTimestamp != null && Long.valueOf(currentLogTS) > Long.valueOf(newTimestamp)) {
+        newestLogs.add(currentLogFile);
+      }
+    }
+    LOG.debug("newestLogs: " + newestLogs);
+    // remove newest log per host because they are still in use
+    resultLogFiles.removeAll(newestLogs);
+    LOG.debug("resultLogFiles after removal of newestLogs: " + resultLogFiles);
+    return resultLogFiles;
+  }
+
+  class NewestLogFilter implements PathFilter {
+    private String lastBackupTS = "0";
+    final private Configuration conf;
+
+    public NewestLogFilter(Configuration conf) {
+      this.conf = conf;
+    }
+
+    protected void setLastBackupTS(String ts) {
+      this.lastBackupTS = ts;
+    }
+
+    @Override
+    public boolean accept(Path path) {
+      // skip meta table log -- ts.meta file
+      if (DefaultWALProvider.isMetaFile(path)) {
+        LOG.debug("Skip .meta log file: " + path.getName());
+        return false;
+      }
+      String timestamp;
+      try {
+        timestamp = BackupUtil.getCreationTime(path, conf);
+        return Long.valueOf(timestamp) > Long.valueOf(lastBackupTS);
+      } catch (IOException e) {
+        LOG.warn("Cannot read timestamp of log file " + path);
+        return false;
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/de69f0df/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/IncrementalRestoreService.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/IncrementalRestoreService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/IncrementalRestoreService.java
new file mode 100644
index 0000000..72e4879
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/IncrementalRestoreService.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface IncrementalRestoreService extends Configurable{
+
+  public void run(String logDirectory, String[] fromTables, String[] toTables)
+    throws IOException;
+}