You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2015/12/27 19:02:19 UTC

[3/6] hbase git commit: HBASE-14030 HBase Backup/Restore Phase 1 (Vladimir Rodionov)

http://git-wip-us.apache.org/repos/asf/hbase/blob/de69f0df/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreClient.java
new file mode 100644
index 0000000..ae21b33
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreClient.java
@@ -0,0 +1,496 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupManifest.BackupImage;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+/**
+ * The main class which interprets the given arguments and trigger restore operation.
+ */
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public final class RestoreClient {
+
+  private static final Log LOG = LogFactory.getLog(RestoreClient.class);
+
+  private static Options opt;
+  private static Configuration conf;
+  private static Set<BackupImage> lastRestoreImagesSet;
+
+  // delimiter in tablename list in restore command
+  private static final String DELIMITER_IN_COMMAND = ",";
+
+  private static final String OPTION_OVERWRITE = "overwrite";
+  private static final String OPTION_CHECK = "check";
+  private static final String OPTION_AUTOMATIC = "automatic";
+
+  private static final String USAGE =
+      "Usage: hbase restore <backup_root_path> <backup_id> <tables> [tableMapping] \n"
+          + "       [-overwrite] [-check] [-automatic]\n"
+          + " backup_root_path  The parent location where the backup images are stored\n"
+          + " backup_id         The id identifying the backup image\n"
+          + " table(s)          Table(s) from the backup image to be restored.\n"
+          + "                   Tables are separated by comma.\n"
+          + " Options:\n"
+          + "   tableMapping    A comma separated list of target tables.\n"
+          + "                   If specified, each table in <tables> must have a mapping.\n"
+          + "   -overwrite      With this option, restore overwrites to the existing table "
+          + "if there's any in\n"
+          + "                   restore target. The existing table must be online before restore.\n"
+          + "   -check          With this option, restore sequence and dependencies are checked\n"
+          + "                   and verified without executing the restore\n"
+          + "   -automatic      With this option, all the dependencies are automatically restored\n"
+          + "                   together with this backup image following the correct order.\n"
+          + "                   The restore dependencies can be checked by using \"-check\" "
+          + "option,\n"
+          + "                   or using \"hbase backup describe\" command. Without this option, "
+          + "only\n" + "                   this backup image is restored\n";
+
+  private RestoreClient(){
+    throw new AssertionError("Instantiating utility class...");
+  }
+
+  protected static void init() throws IOException {
+    // define supported options
+    opt = new Options();
+    opt.addOption(OPTION_OVERWRITE, false,
+        "Overwrite the data if any of the restore target tables exists");
+    opt.addOption(OPTION_CHECK, false, "Check restore sequence and dependencies");
+    opt.addOption(OPTION_AUTOMATIC, false, "Restore all dependencies");
+    opt.addOption("debug", false, "Enable debug logging");
+
+    conf = getConf();
+
+    // disable irrelevant loggers to avoid it mess up command output
+    disableUselessLoggers();
+  }
+
+  public static void main(String[] args) throws IOException {
+    init();
+    parseAndRun(args);
+  }
+
+  private static void parseAndRun(String[] args) {
+    CommandLine cmd = null;
+    try {
+      cmd = new PosixParser().parse(opt, args);
+    } catch (ParseException e) {
+      LOG.error("Could not parse command", e);
+      System.exit(-1);
+    }
+
+    // enable debug logging
+    Logger backupClientLogger = Logger.getLogger("org.apache.hadoop.hbase.backup");
+    if (cmd.hasOption("debug")) {
+      backupClientLogger.setLevel(Level.DEBUG);
+    }
+
+    // whether to overwrite to existing table if any, false by default
+    boolean isOverwrite = cmd.hasOption(OPTION_OVERWRITE);
+    if (isOverwrite) {
+      LOG.debug("Found -overwrite option in restore command, "
+          + "will overwrite to existing table if any in the restore target");
+    }
+
+    // whether to only check the dependencies, false by default
+    boolean check = cmd.hasOption(OPTION_CHECK);
+    if (check) {
+      LOG.debug("Found -check option in restore command, "
+          + "will check and verify the dependencies");
+    }
+
+    // whether to restore all dependencies, false by default
+    boolean autoRestore = cmd.hasOption(OPTION_AUTOMATIC);
+    if (autoRestore) {
+      LOG.debug("Found -automatic option in restore command, "
+          + "will automatically retore all the dependencies");
+    }
+
+    // parse main restore command options
+    String[] remainArgs = cmd.getArgs();
+    if (remainArgs.length < 3) {
+      System.out.println("ERROR: missing arguments");
+      System.out.println(USAGE);
+      System.exit(-1);
+    }
+
+    String backupRootDir = remainArgs[0];
+    String backupId = remainArgs[1];
+    String tables = remainArgs[2];
+
+    String tableMapping = (remainArgs.length > 3) ? remainArgs[3] : null;
+
+    String[] sTableArray = (tables != null) ? tables.split(DELIMITER_IN_COMMAND) : null;
+    String[] tTableArray = (tableMapping != null) ? tableMapping.split(DELIMITER_IN_COMMAND) : null;
+
+    if (tableMapping != null && tTableArray != null && (sTableArray.length != tTableArray.length)) {
+      System.err.println("ERROR: table mapping mismatch: " + tables + " : " + tableMapping);
+      System.out.println(USAGE);
+      System.exit(-1);
+    }
+
+    try {
+      HBackupFileSystem hBackupFS = new HBackupFileSystem(conf, new Path(backupRootDir), backupId);
+      restore_stage1(hBackupFS, backupRootDir, backupId, check, autoRestore, sTableArray,
+        tTableArray, isOverwrite);
+    } catch (IOException e) {
+      System.err.println("ERROR: " + e.getMessage());
+      System.exit(-1);
+    }
+  }
+
+  /**
+   * Restore operation. Stage 1: validate backupManifest, and check target tables
+   * @param hBackupFS to access the backup image
+   * @param backupRootDir The root dir for backup image
+   * @param backupId The backup id for image to be restored
+   * @param check True if only do dependency check
+   * @param autoRestore True if automatically restore following the dependency
+   * @param sTableArray The array of tables to be restored
+   * @param tTableArray The array of mapping tables to restore to
+   * @param isOverwrite True then do restore overwrite if target table exists, otherwise fail the
+   *          request if target table exists
+   * @return True if only do dependency check
+   * @throws IOException if any failure during restore
+   */
+  public static boolean restore_stage1(HBackupFileSystem hBackupFS, String backupRootDir,
+      String backupId, boolean check, boolean autoRestore, String[] sTableArray,
+      String[] tTableArray, boolean isOverwrite) throws IOException {
+
+    HashMap<String, BackupManifest> backupManifestMap = new HashMap<String, BackupManifest>();
+    // check and load backup image manifest for the tables
+    hBackupFS.checkImageManifestExist(backupManifestMap, sTableArray);
+
+    try {
+      // Check and validate the backup image and its dependencies
+      if (check || autoRestore) {
+        if (validate(backupManifestMap)) {
+          LOG.info("Checking backup images: ok");
+        } else {
+          String errMsg = "Some dependencies are missing for restore";
+          LOG.error(errMsg);
+          throw new IOException(errMsg);
+        }
+      }
+
+      // return true if only for check
+      if (check) {
+        return true;
+      }
+
+      if (tTableArray == null) {
+        tTableArray = sTableArray;
+      }
+
+      // check the target tables
+      checkTargetTables(tTableArray, isOverwrite);
+
+      // start restore process
+      Set<BackupImage> restoreImageSet =
+          restore_stage2(hBackupFS, backupManifestMap, sTableArray, tTableArray, autoRestore);
+
+      LOG.info("Restore for " + Arrays.asList(sTableArray) + " are successful!");
+      lastRestoreImagesSet = restoreImageSet;
+
+    } catch (IOException e) {
+      LOG.error("ERROR: restore failed with error: " + e.getMessage());
+      throw e;
+    }
+
+    // not only for check, return false
+    return false;
+  }
+
+  /**
+   * Get last restore image set. The value is globally set for the latest finished restore.
+   * @return the last restore image set
+   */
+  public static Set<BackupImage> getLastRestoreImagesSet() {
+    return lastRestoreImagesSet;
+  }
+
+  private static boolean validate(HashMap<String, BackupManifest> backupManifestMap)
+      throws IOException {
+    boolean isValid = true;
+
+    for (Entry<String, BackupManifest> manifestEntry : backupManifestMap.entrySet()) {
+
+      String table = manifestEntry.getKey();
+      TreeSet<BackupImage> imageSet = new TreeSet<BackupImage>();
+
+      ArrayList<BackupImage> depList = manifestEntry.getValue().getDependentListByTable(table);
+      if (depList != null && !depList.isEmpty()) {
+        imageSet.addAll(depList);
+      }
+
+      // todo merge
+      LOG.debug("merge will be implemented in future jira");
+      // BackupUtil.clearMergedImages(table, imageSet, conf);
+
+      LOG.info("Dependent image(s) from old to new:");
+      for (BackupImage image : imageSet) {
+        String imageDir =
+            HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(), table);
+        if (!HBackupFileSystem.checkPathExist(imageDir, getConf())) {
+          LOG.error("ERROR: backup image does not exist: " + imageDir);
+          isValid = false;
+          break;
+        }
+        // TODO More validation?
+        LOG.info("Backup image: " + image.getBackupId() + " for '" + table + "' is available");
+      }
+    }
+    return isValid;
+  }
+
+  /**
+   * Validate target Tables
+   * @param tTableArray: target tables
+   * @param isOverwrite overwrite existing table
+   * @throws IOException exception
+   */
+  private static void checkTargetTables(String[] tTableArray, boolean isOverwrite)
+      throws IOException {
+    ArrayList<String> existTableList = new ArrayList<String>();
+    ArrayList<String> disabledTableList = new ArrayList<String>();
+
+    // check if the tables already exist
+    HBaseAdmin admin = null;
+    Connection conn = null;
+    try {
+      conn = ConnectionFactory.createConnection(conf);
+      admin = (HBaseAdmin) conn.getAdmin();
+      for (String tableName : tTableArray) {
+        if (admin.tableExists(TableName.valueOf(tableName))) {
+          existTableList.add(tableName);
+          if (admin.isTableDisabled(TableName.valueOf(tableName))) {
+            disabledTableList.add(tableName);
+          }
+        } else {
+          LOG.info("HBase table " + tableName
+              + " does not exist. It will be create during backup process");
+        }
+      }
+    } finally {
+      if (admin != null) {
+        admin.close();
+      }
+      if (conn != null) {
+        conn.close();
+      }
+    }
+
+    if (existTableList.size() > 0) {
+      if (!isOverwrite) {
+        LOG.error("Existing table found in the restore target, please add \"-overwrite\" "
+            + "option in the command if you mean to restore to these existing tables");
+        LOG.info("Existing table list in restore target: " + existTableList);
+        throw new IOException("Existing table found in target while no \"-overwrite\" "
+            + "option found");
+      } else {
+        if (disabledTableList.size() > 0) {
+          LOG.error("Found offline table in the restore target, "
+              + "please enable them before restore with \"-overwrite\" option");
+          LOG.info("Offline table list in restore target: " + disabledTableList);
+          throw new IOException(
+              "Found offline table in the target when restore with \"-overwrite\" option");
+        }
+      }
+    }
+
+  }
+
+  /**
+   * Restore operation. Stage 2: resolved Backup Image dependency
+   * @param hBackupFS to access the backup image
+   * @param backupManifestMap : tableName,  Manifest
+   * @param sTableArray The array of tables to be restored
+   * @param tTableArray The array of mapping tables to restore to
+   * @param autoRestore : yes, restore all the backup images on the dependency list
+   * @return set of BackupImages restored
+   * @throws IOException exception
+   */
+  private static Set<BackupImage> restore_stage2(HBackupFileSystem hBackupFS,
+    HashMap<String, BackupManifest> backupManifestMap, String[] sTableArray,
+    String[] tTableArray, boolean autoRestore) throws IOException {
+    TreeSet<BackupImage> restoreImageSet = new TreeSet<BackupImage>();
+
+    for (int i = 0; i < sTableArray.length; i++) {
+      restoreImageSet.clear();
+      String table = sTableArray[i];
+      BackupManifest manifest = backupManifestMap.get(table);
+      if (autoRestore) {
+        // Get the image list of this backup for restore in time order from old
+        // to new.
+        TreeSet<BackupImage> restoreList =
+            new TreeSet<BackupImage>(manifest.getDependentListByTable(table));
+        LOG.debug("need to clear merged Image. to be implemented in future jira");
+
+        for (BackupImage image : restoreList) {
+          restoreImage(image, table, tTableArray[i]);
+        }
+        restoreImageSet.addAll(restoreList);
+      } else {
+        BackupImage image = manifest.getBackupImage();
+        List<BackupImage> depList = manifest.getDependentListByTable(table);
+        // The dependency list always contains self.
+        if (depList != null && depList.size() > 1) {
+          LOG.warn("Backup image " + image.getBackupId() + " depends on other images.\n"
+              + "this operation will only restore the delta contained within backupImage "
+              + image.getBackupId());
+        }
+        restoreImage(image, table, tTableArray[i]);
+        restoreImageSet.add(image);
+      }
+
+      if (autoRestore) {
+        if (restoreImageSet != null && !restoreImageSet.isEmpty()) {
+          LOG.info("Restore includes the following image(s):");
+          for (BackupImage image : restoreImageSet) {
+            LOG.info("  Backup: "
+                + image.getBackupId()
+                + " "
+                + HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(),
+                  table));
+          }
+        }
+      }
+
+    }
+    return restoreImageSet;
+  }
+
+  /**
+   * Restore operation handle each backupImage
+   * @param image: backupImage
+   * @param sTable: table to be restored
+   * @param tTable: table to be restored to
+   * @throws IOException exception
+   */
+  private static void restoreImage(BackupImage image, String sTable, String tTable)
+      throws IOException {
+
+    Configuration conf = getConf();
+
+    String rootDir = image.getRootDir();
+    LOG.debug("Image root dir " + rootDir);
+    String backupId = image.getBackupId();
+
+    HBackupFileSystem hFS = new HBackupFileSystem(conf, new Path(rootDir), backupId);
+    RestoreUtil restoreTool = new RestoreUtil(conf, hFS);
+    BackupManifest manifest = hFS.getManifest(sTable);
+
+    Path tableBackupPath = hFS.getTableBackupPath(sTable);
+
+    // todo: convert feature will be provided in a future jira
+    boolean converted = false;
+
+    if (manifest.getType().equals(BackupRestoreConstants.BACKUP_TYPE_FULL) || converted) {
+      LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from "
+          + (converted ? "converted" : "full") + " backup image " + tableBackupPath.toString());
+      restoreTool.fullRestoreTable(tableBackupPath, sTable, tTable, converted);
+    } else { // incremental Backup
+      String logBackupDir =
+          HBackupFileSystem.getLogBackupDir(image.getRootDir(), image.getBackupId());
+      LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from incremental backup image "
+          + logBackupDir);
+      restoreTool.incrementalRestoreTable(logBackupDir, new String[] { sTable },
+        new String[] { tTable });
+    }
+
+    LOG.info(sTable + " has been successfully restored to " + tTable);
+  }
+
+  /**
+   * Set the configuration from a given one.
+   * @param newConf A new given configuration
+   */
+  public synchronized static void setConf(Configuration newConf) {
+    conf = newConf;
+  }
+
+  /**
+   * Get and merge Hadoop and HBase configuration.
+   * @throws IOException exception
+   */
+  protected static Configuration getConf() {
+    if (conf == null) {
+      synchronized (RestoreClient.class) {
+        conf = new Configuration();
+        HBaseConfiguration.merge(conf, HBaseConfiguration.create());
+      }
+    }
+    return conf;
+  }
+
+  private static void disableUselessLoggers() {
+    // disable zookeeper log to avoid it mess up command output
+    Logger zkLogger = Logger.getLogger("org.apache.zookeeper");
+    LOG.debug("Zookeeper log level before set: " + zkLogger.getLevel());
+    zkLogger.setLevel(Level.OFF);
+    LOG.debug("Zookeeper log level after set: " + zkLogger.getLevel());
+
+    // disable hbase zookeeper tool log to avoid it mess up command output
+    Logger hbaseZkLogger = Logger.getLogger("org.apache.hadoop.hbase.zookeeper");
+    LOG.debug("HBase zookeeper log level before set: " + hbaseZkLogger.getLevel());
+    hbaseZkLogger.setLevel(Level.OFF);
+    LOG.debug("HBase Zookeeper log level after set: " + hbaseZkLogger.getLevel());
+
+    // disable hbase client log to avoid it mess up command output
+    Logger hbaseClientLogger = Logger.getLogger("org.apache.hadoop.hbase.client");
+    LOG.debug("HBase client log level before set: " + hbaseClientLogger.getLevel());
+    hbaseClientLogger.setLevel(Level.OFF);
+    LOG.debug("HBase client log level after set: " + hbaseClientLogger.getLevel());
+
+    // disable other related log to avoid mess up command output
+    Logger otherLogger = Logger.getLogger("org.apache.hadoop.hbase.io.hfile");
+    otherLogger.setLevel(Level.OFF);
+    otherLogger = Logger.getLogger("org.apache.hadoop.hbase.util");
+    otherLogger.setLevel(Level.OFF);
+    otherLogger = Logger.getLogger("org.apache.hadoop.hbase.mapreduce");
+    otherLogger.setLevel(Level.OFF);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/de69f0df/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreUtil.java
new file mode 100644
index 0000000..bdb7988
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreUtil.java
@@ -0,0 +1,503 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.NavigableSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
+import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALSplitter;
+
+/**
+ * A collection for methods used by multiple classes to restore HBase tables.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class RestoreUtil {
+
+  public static final Log LOG = LogFactory.getLog(RestoreUtil.class);
+
+  protected Configuration conf = null;
+
+  protected HBackupFileSystem hBackupFS = null;
+
+  // store table name and snapshot dir mapping
+  private final HashMap<String, Path> snapshotMap = new HashMap<String, Path>();
+
+  public RestoreUtil(Configuration conf, HBackupFileSystem hBackupFS) throws IOException {
+    this.conf = conf;
+    this.hBackupFS = hBackupFS;
+  }
+
+  /**
+   * During incremental backup operation. Call WalPlayer to replay WAL in backup image Currently
+   * tableNames and newTablesNames only contain single table, will be expanded to multiple tables in
+   * the future
+   * @param logDir : incremental backup folders, which contains WAL
+   * @param tableNames : source tableNames(table names were backuped)
+   * @param newTableNames : target tableNames(table names to be restored to)
+   * @throws IOException exception
+   */
+  public void incrementalRestoreTable(String logDir, String[] tableNames, String[] newTableNames)
+      throws IOException {
+
+    if (tableNames.length != newTableNames.length) {
+      throw new IOException("Number of source tables adn taget Tables does not match!");
+    }
+
+    // for incremental backup image, expect the table already created either by user or previous
+    // full backup. Here, check that all new tables exists
+    HBaseAdmin admin = null;
+    Connection conn = null;
+    try {
+      conn = ConnectionFactory.createConnection(conf);
+      admin = (HBaseAdmin) conn.getAdmin();
+      for (String tableName : newTableNames) {
+        if (!admin.tableExists(TableName.valueOf(tableName))) {
+          admin.close();
+          throw new IOException("HBase table " + tableName
+            + " does not exist. Create the table first, e.g. by restoring a full backup.");
+        }
+      }
+      IncrementalRestoreService restoreService =
+          BackupRestoreServiceFactory.getIncrementalRestoreService(conf);
+
+      restoreService.run(logDir, tableNames, newTableNames);
+    } finally {
+      if (admin != null) {
+        admin.close();
+      }
+      if(conn != null){
+        conn.close();
+      }
+    }
+  }
+
+  public void fullRestoreTable(Path tableBackupPath, String tableName, String newTableName,
+      boolean converted) throws IOException {
+
+    restoreTableAndCreate(tableName, newTableName, tableBackupPath, converted);
+  }
+
+  private void restoreTableAndCreate(String tableName, String newTableName, Path tableBackupPath,
+       boolean converted) throws IOException {
+    if (newTableName == null || newTableName.equals("")) {
+      newTableName = tableName;
+    }
+
+    FileSystem fileSys = tableBackupPath.getFileSystem(this.conf);
+
+    // get table descriptor first
+    HTableDescriptor tableDescriptor = null;
+
+    Path tableSnapshotPath = hBackupFS.getTableSnapshotPath(tableName);
+
+    if (fileSys.exists(tableSnapshotPath)) {
+      // snapshot path exist means the backup path is in HDFS
+      // check whether snapshot dir already recorded for target table
+      if (snapshotMap.get(tableName) != null) {
+        SnapshotDescription desc =
+            SnapshotDescriptionUtils.readSnapshotInfo(fileSys, tableSnapshotPath);
+        SnapshotManifest manifest = SnapshotManifest.open(conf, fileSys, tableSnapshotPath, desc);
+        tableDescriptor = manifest.getTableDescriptor();
+        LOG.debug("tableDescriptor.getNameAsString() = " + tableDescriptor.getNameAsString()
+          + " while tableName = " + tableName);
+        // HBase 96.0 and 98.0
+        // tableDescriptor =
+        // FSTableDescriptors.getTableDescriptorFromFs(fileSys, snapshotMap.get(tableName));
+      } else {
+        tableDescriptor = hBackupFS.getTableDesc(tableName);
+        LOG.debug("tableSnapshotPath=" + tableSnapshotPath.toString());
+        snapshotMap.put(tableName, hBackupFS.getTableInfoPath(tableName));
+      }
+      if (tableDescriptor == null) {
+        LOG.debug("Found no table descriptor in the snapshot dir, previous schema would be lost");
+      }
+    } else if (converted) {
+      // first check if this is a converted backup image
+      LOG.error("convert will be supported in a future jira");
+    }
+
+    Path tableArchivePath = hBackupFS.getTableArchivePath(tableName);
+    if (tableArchivePath == null) {
+      if (tableDescriptor != null) {
+        // find table descriptor but no archive dir means the table is empty, create table and exit
+        LOG.debug("find table descriptor but no archive dir for table " + tableName
+          + ", will only create table");
+        tableDescriptor.setName(Bytes.toBytes(newTableName));
+        checkAndCreateTable(tableBackupPath, tableName, newTableName, null, tableDescriptor);
+        return;
+      } else {
+        throw new IllegalStateException("Cannot restore hbase table because directory '"
+            + " tableArchivePath is null.");
+      }
+    }
+
+    if (tableDescriptor == null) {
+      tableDescriptor = new HTableDescriptor(newTableName);
+    } else {
+      tableDescriptor.setName(Bytes.toBytes(newTableName));
+    }
+
+    if (!converted) {
+      // record all region dirs:
+      // load all files in dir
+      try {
+        ArrayList<Path> regionPathList = hBackupFS.getRegionList(tableName);
+
+        // should only try to create the table with all region informations, so we could pre-split
+        // the regions in fine grain
+        checkAndCreateTable(tableBackupPath, tableName, newTableName, regionPathList,
+          tableDescriptor);
+        if (tableArchivePath != null) {
+          // start real restore through bulkload
+          // if the backup target is on local cluster, special action needed
+          Path tempTableArchivePath = hBackupFS.checkLocalAndBackup(tableArchivePath);
+          if (tempTableArchivePath.equals(tableArchivePath)) {
+            LOG.debug("TableArchivePath for bulkload using existPath: " + tableArchivePath);
+          } else {
+            regionPathList = hBackupFS.getRegionList(tempTableArchivePath); // point to the tempDir
+            LOG.debug("TableArchivePath for bulkload using tempPath: " + tempTableArchivePath);
+          }
+
+          LoadIncrementalHFiles loader = createLoader(tempTableArchivePath, false);
+          for (Path regionPath : regionPathList) {
+            String regionName = regionPath.toString();
+            LOG.debug("Restoring HFiles from directory " + regionName);
+            String[] args = { regionName, newTableName };
+            loader.run(args);
+          }
+        }
+        // restore the recovered.edits if exists
+        replayRecoveredEditsIfAny(tableBackupPath, tableName, tableDescriptor);
+      } catch (Exception e) {
+        throw new IllegalStateException("Cannot restore hbase table", e);
+      }
+    } else {
+      LOG.debug("convert will be supported in a future jira");
+    }
+  }
+
+  /**
+   * Replay recovered edits from backup.
+   */
+  private void replayRecoveredEditsIfAny(Path tableBackupPath, String tableName,
+      HTableDescriptor newTableHtd) throws IOException {
+
+    LOG.debug("Trying to replay the recovered.edits if exist to the target table "
+        + newTableHtd.getNameAsString() + " from the backup of table " + tableName + ".");
+
+    FileSystem fs = tableBackupPath.getFileSystem(this.conf);
+    ArrayList<Path> regionDirs = hBackupFS.getRegionList(tableName);
+
+    if (regionDirs == null || regionDirs.size() == 0) {
+      LOG.warn("No recovered.edits to be replayed for empty backup of table " + tableName + ".");
+      return;
+    }
+
+    Connection conn = null;
+    try {
+
+      conn = ConnectionFactory.createConnection(conf);
+
+      for (Path regionDir : regionDirs) {
+        // OLD: NavigableSet<Path> files = HLogUtil.getSplitEditFilesSorted(fs, regionDir);
+        NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regionDir);
+
+        if (files == null || files.isEmpty()) {
+          LOG.warn("No recovered.edits found for the region " + regionDir.getName() + ".");
+          return;
+        }
+
+        for (Path edits : files) {
+          if (edits == null || !fs.exists(edits)) {
+            LOG.warn("Null or non-existent edits file: " + edits);
+            continue;
+          }
+
+          HTable table = null;
+          try {
+            table = (HTable) conn.getTable(newTableHtd.getTableName());
+            replayRecoveredEdits(table, fs, edits);
+            table.flushCommits();
+            table.close();
+          } catch (IOException e) {
+            boolean skipErrors = conf.getBoolean("hbase.skip.errors", false);
+            if (skipErrors) {
+              Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
+              LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
+                + "=true so continuing. Renamed " + edits + " as " + p, e);
+            } else {
+              throw e;
+            }
+          } finally {
+            if (table != null) {
+              table.close();
+            }
+          }
+        } // for each edit file under a region
+      } // for each region
+
+    } finally {
+      if (conn != null) {
+        conn.close();
+      }
+    }
+  }
+
+  /**
+   * Restore process for an edit entry.
+   * @param htable The target table of restore
+   * @param key HLog key
+   * @param val KVs
+   * @throws IOException exception
+   */
+  private void restoreEdit(HTable htable, WALKey key, WALEdit val) throws IOException {
+    Put put = null;
+    Delete del = null;
+    Cell lastKV = null;
+    for (Cell kv : val.getCells()) {
+      // filtering HLog meta entries, see HLog.completeCacheFlushLogEdit
+      if (WALEdit.isMetaEditFamily(CellUtil.cloneFamily(kv))) {
+        continue;
+      }
+
+      // A WALEdit may contain multiple operations (HBASE-3584) and/or
+      // multiple rows (HBASE-5229).
+      // Aggregate as much as possible into a single Put/Delete
+      // operation before apply the action to the table.
+      if (lastKV == null || lastKV.getTypeByte() != kv.getTypeByte()
+          || !CellUtil.matchingRow(lastKV, kv)) {
+        // row or type changed, write out aggregate KVs.
+        if (put != null) {
+          applyAction(htable, put);
+        }
+        if (del != null) {
+          applyAction(htable, del);
+        }
+
+        if (CellUtil.isDelete(kv)) {
+          del = new Delete(CellUtil.cloneRow(kv));
+        } else {
+          put = new Put(CellUtil.cloneRow(kv));
+        }
+      }
+      if (CellUtil.isDelete(kv)) {
+        del.addDeleteMarker(kv);
+      } else {
+        put.add(kv);
+      }
+      lastKV = kv;
+    }
+    // write residual KVs
+    if (put != null) {
+      applyAction(htable, put);
+    }
+    if (del != null) {
+      applyAction(htable, del);
+    }
+  }
+
+  /**
+   * Apply an action (Put/Delete) to table.
+   * @param table table
+   * @param action action
+   * @throws IOException exception
+   */
+  private void applyAction(HTable table, Mutation action) throws IOException {
+    // The actions are not immutable, so we defensively copy them
+    if (action instanceof Put) {
+      Put put = new Put((Put) action);
+      // put.setWriteToWAL(false);
+      // why do not we do WAL?
+      put.setDurability(Durability.SKIP_WAL);
+      table.put(put);
+    } else if (action instanceof Delete) {
+      Delete delete = new Delete((Delete) action);
+      table.delete(delete);
+    } else {
+      throw new IllegalArgumentException("action must be either Delete or Put");
+    }
+  }
+
+  /**
+   * Replay the given edits.
+   * @param htable The target table of restore
+   * @param fs File system
+   * @param edits Recovered.edits to be replayed
+   * @throws IOException exception
+   */
+  private void replayRecoveredEdits(HTable htable, FileSystem fs, Path edits) throws IOException {
+    LOG.debug("Replaying edits from " + edits + "; path=" + edits);
+
+    WAL.Reader reader = null;
+    try {
+      reader = WALFactory.createReader(fs, edits, this.conf);
+      long editsCount = 0;
+      WAL.Entry entry;
+
+      try {
+        while ((entry = reader.next()) != null) {
+          restoreEdit(htable, entry.getKey(), entry.getEdit());
+          editsCount++;
+        }
+        LOG.debug(editsCount + " edits from " + edits + " have been replayed.");
+
+      } catch (EOFException eof) {
+        Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
+        String msg =
+            "Encountered EOF. Most likely due to Master failure during "
+                + "log spliting, so we have this data in another edit.  "
+                + "Continuing, but renaming " + edits + " as " + p;
+        LOG.warn(msg, eof);
+      } catch (IOException ioe) {
+        // If the IOE resulted from bad file format,
+        // then this problem is idempotent and retrying won't help
+        if (ioe.getCause() instanceof ParseException) {
+          Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
+          String msg =
+              "File corruption encountered!  " + "Continuing, but renaming " + edits + " as " + p;
+          LOG.warn(msg, ioe);
+        } else {
+          // other IO errors may be transient (bad network connection,
+          // checksum exception on one datanode, etc). throw & retry
+          throw ioe;
+        }
+      }
+    } finally {
+      if (reader != null) {
+        reader.close();
+      }
+    }
+  }
+
+  /**
+   * Create a {@link LoadIncrementalHFiles} instance to be used to restore the HFiles of a full
+   * backup.
+   * @return the {@link LoadIncrementalHFiles} instance
+   * @throws IOException exception
+   */
+  private LoadIncrementalHFiles createLoader(Path tableArchivePath, boolean multipleTables)
+      throws IOException {
+    // set configuration for restore:
+    // LoadIncrementalHFile needs more time
+    // <name>hbase.rpc.timeout</name> <value>600000</value>
+    // calculates
+    Integer milliSecInMin = 60000;
+    Integer previousMillis = this.conf.getInt("hbase.rpc.timeout", 0);
+    Integer numberOfFilesInDir =
+        multipleTables ? hBackupFS.getMaxNumberOfFilesInSubDir(tableArchivePath) : hBackupFS
+            .getNumberOfFilesInDir(tableArchivePath);
+    Integer calculatedMillis = numberOfFilesInDir * milliSecInMin; // 1 minute per file
+    Integer resultMillis = Math.max(calculatedMillis, previousMillis);
+    if (resultMillis > previousMillis) {
+      LOG.info("Setting configuration for restore with LoadIncrementalHFile: "
+          + "hbase.rpc.timeout to " + calculatedMillis / milliSecInMin
+          + " minutes, to handle the number of files in backup " + tableArchivePath);
+      this.conf.setInt("hbase.rpc.timeout", resultMillis);
+    }
+
+    LoadIncrementalHFiles loader = null;
+    try {
+      loader = new LoadIncrementalHFiles(this.conf);
+    } catch (Exception e1) {
+      throw new IOException(e1);
+    }
+    return loader;
+  }
+
+  /**
+   * Prepare the table for bulkload, most codes copied from
+   * {@link LoadIncrementalHFiles#createTable(String, String)}
+   * @param tableBackupPath path
+   * @param tableName table name
+   * @param targetTableName target table name
+   * @param regionDirList region directory list
+   * @param htd table descriptor
+   * @throws IOException exception
+   */
+  private void checkAndCreateTable(Path tableBackupPath, String tableName, String targetTableName,
+      ArrayList<Path> regionDirList, HTableDescriptor htd) throws IOException {
+    HBaseAdmin hbadmin = null;
+    Connection conn = null;
+    try {
+      conn = ConnectionFactory.createConnection(conf);
+      hbadmin = (HBaseAdmin) conn.getAdmin();
+      if (hbadmin.tableExists(TableName.valueOf(targetTableName))) {
+        LOG.info("Using exising target table '" + targetTableName + "'");
+      } else {
+        LOG.info("Creating target table '" + targetTableName + "'");
+
+        // if no region dir given, create the table and return
+        if (regionDirList == null || regionDirList.size() == 0) {
+
+          hbadmin.createTable(htd);
+          return;
+        }
+
+        byte[][] keys = hBackupFS.generateBoundaryKeys(regionDirList);
+
+        // create table using table decriptor and region boundaries
+        hbadmin.createTable(htd, keys);
+      }
+    } catch (Exception e) {
+      throw new IOException(e);
+    } finally {
+      if (hbadmin != null) {
+        hbadmin.close();
+      }
+      if(conn != null){
+        conn.close();
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/de69f0df/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java
new file mode 100644
index 0000000..a3b5db5
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.mapreduce;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.backup.BackupCopyService;
+import org.apache.hadoop.hbase.backup.BackupHandler;
+import org.apache.hadoop.hbase.backup.BackupUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.snapshot.ExportSnapshot;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.tools.DistCp;
+import org.apache.hadoop.tools.DistCpConstants;
+import org.apache.hadoop.tools.DistCpOptions;
+/**
+ * Copier for backup operation. Basically, there are 2 types of copy. One is copying from snapshot,
+ * which bases on extending ExportSnapshot's function with copy progress reporting to ZooKeeper
+ * implementation. The other is copying for incremental log files, which bases on extending 
+ * DistCp's function with copy progress reporting to ZooKeeper implementation.
+ *
+ * For now this is only a wrapper. The other features such as progress and increment backup will be
+ * implemented in future jira
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class MapReduceBackupCopyService implements BackupCopyService {
+  private static final Log LOG = LogFactory.getLog(MapReduceBackupCopyService.class);
+
+  private Configuration conf;
+  // private static final long BYTES_PER_MAP = 2 * 256 * 1024 * 1024;
+
+  // Accumulated progress within the whole backup process for the copy operation
+  private float progressDone = 0.1f;
+  private long bytesCopied = 0;
+  private static float INIT_PROGRESS = 0.1f;
+
+  // The percentage of the current copy task within the whole task if multiple time copies are
+  // needed. The default value is 100%, which means only 1 copy task for the whole.
+  private float subTaskPercntgInWholeTask = 1f;
+
+  public MapReduceBackupCopyService() {
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Get the current copy task percentage within the whole task if multiple copies are needed.
+   * @return the current copy task percentage
+   */
+  public float getSubTaskPercntgInWholeTask() {
+    return subTaskPercntgInWholeTask;
+  }
+
+  /**
+   * Set the current copy task percentage within the whole task if multiple copies are needed. Must
+   * be called before calling
+   * {@link #copy(BackupHandler, Configuration, Type, String[])}
+   * @param subTaskPercntgInWholeTask The percentage of the copy subtask
+   */
+  public void setSubTaskPercntgInWholeTask(float subTaskPercntgInWholeTask) {
+    this.subTaskPercntgInWholeTask = subTaskPercntgInWholeTask;
+  }
+
+  class SnapshotCopy extends ExportSnapshot {
+    private BackupHandler backupHandler;
+    private String table;
+
+    public SnapshotCopy(BackupHandler backupHandler, String table) {
+      super();
+      this.backupHandler = backupHandler;
+      this.table = table;
+    }
+
+    public BackupHandler getBackupHandler() {
+      return this.backupHandler;
+    }
+
+    public String getTable() {
+      return this.table;
+    }
+  }  
+  
+  // Extends DistCp for progress updating to hbase:backup
+  // during backup. Using DistCpV2 (MAPREDUCE-2765).
+  // Simply extend it and override execute() method to get the 
+  // Job reference for progress updating.
+  // Only the argument "src1, [src2, [...]] dst" is supported, 
+  // no more DistCp options.
+  class BackupDistCp extends DistCp {
+
+    private BackupHandler backupHandler;
+
+    public BackupDistCp(Configuration conf, DistCpOptions options, BackupHandler backupHandler)
+        throws Exception {
+      super(conf, options);
+      this.backupHandler = backupHandler;
+    }
+
+    @Override
+    public Job execute() throws Exception {
+
+      // reflection preparation for private methods and fields
+      Class<?> classDistCp = org.apache.hadoop.tools.DistCp.class;
+      Method methodCreateMetaFolderPath = classDistCp.getDeclaredMethod("createMetaFolderPath");
+      Method methodCreateJob = classDistCp.getDeclaredMethod("createJob");
+      Method methodCreateInputFileListing =
+          classDistCp.getDeclaredMethod("createInputFileListing", Job.class);
+      Method methodCleanup = classDistCp.getDeclaredMethod("cleanup");
+
+      Field fieldInputOptions = classDistCp.getDeclaredField("inputOptions");
+      Field fieldMetaFolder = classDistCp.getDeclaredField("metaFolder");
+      Field fieldJobFS = classDistCp.getDeclaredField("jobFS");
+      Field fieldSubmitted = classDistCp.getDeclaredField("submitted");
+
+      methodCreateMetaFolderPath.setAccessible(true);
+      methodCreateJob.setAccessible(true);
+      methodCreateInputFileListing.setAccessible(true);
+      methodCleanup.setAccessible(true);
+
+      fieldInputOptions.setAccessible(true);
+      fieldMetaFolder.setAccessible(true);
+      fieldJobFS.setAccessible(true);
+      fieldSubmitted.setAccessible(true);
+
+      // execute() logic starts here
+      assert fieldInputOptions.get(this) != null;
+      assert getConf() != null;
+
+      Job job = null;
+      try {
+        synchronized (this) {
+          // Don't cleanup while we are setting up.
+          fieldMetaFolder.set(this, methodCreateMetaFolderPath.invoke(this));
+          fieldJobFS.set(this, ((Path) fieldMetaFolder.get(this)).getFileSystem(getConf()));
+
+          job = (Job) methodCreateJob.invoke(this);
+        }
+        methodCreateInputFileListing.invoke(this, job);
+
+        // Get the total length of the source files
+        List<Path> srcs = ((DistCpOptions) fieldInputOptions.get(this)).getSourcePaths();
+        long totalSrcLgth = 0;
+        for (Path aSrc : srcs) {
+          totalSrcLgth += BackupUtil.getFilesLength(aSrc.getFileSystem(getConf()), aSrc);
+        }
+
+        // submit the copy job
+        job.submit();
+        fieldSubmitted.set(this, true);
+
+        // after submit the MR job, set its handler in backup handler for cancel process
+        // this.backupHandler.copyJob = job;
+
+        // Update the copy progress to ZK every 0.5s if progress value changed
+        int progressReportFreq =
+            this.getConf().getInt("hbase.backup.progressreport.frequency", 500);
+        float lastProgress = progressDone;
+        while (!job.isComplete()) {
+          float newProgress =
+              progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS);
+
+          if (newProgress > lastProgress) {
+
+            BigDecimal progressData =
+                new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP);
+            String newProgressStr = progressData + "%";
+            LOG.info("Progress: " + newProgressStr);
+            this.backupHandler.updateProgress(newProgressStr, bytesCopied);
+            LOG.debug("Backup progress data updated to hbase:backup: \"Progress: " + newProgressStr
+              + ".\"");
+            lastProgress = newProgress;
+          }
+          Thread.sleep(progressReportFreq);
+        }
+
+        // update the progress data after copy job complete
+        float newProgress =
+            progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS);
+        BigDecimal progressData =
+            new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP);
+
+        String newProgressStr = progressData + "%";
+        LOG.info("Progress: " + newProgressStr);
+
+        // accumulate the overall backup progress
+        progressDone = newProgress;
+        bytesCopied += totalSrcLgth;
+
+        this.backupHandler.updateProgress(newProgressStr, bytesCopied);
+        LOG.debug("Backup progress data updated to hbase:backup: \"Progress: " + newProgressStr
+          + " - " + bytesCopied + " bytes copied.\"");
+
+      } finally {
+        if (!fieldSubmitted.getBoolean(this)) {
+          methodCleanup.invoke(this);
+        }
+      }
+
+      String jobID = job.getJobID().toString();
+      job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID);
+
+      LOG.debug("DistCp job-id: " + jobID);
+      return job;
+    }
+
+  }
+
+  /**
+   * Do backup copy based on different types.
+   * @param handler The backup handler reference
+   * @param conf The hadoop configuration
+   * @param copyType The backup copy type
+   * @param options Options for customized ExportSnapshot or DistCp
+   * @throws Exception exception
+   */
+  public int copy(BackupHandler handler, Configuration conf, BackupCopyService.Type copyType,
+      String[] options) throws IOException {
+
+    int res = 0;
+
+    try {
+      if (copyType == Type.FULL) {
+        SnapshotCopy snapshotCp =
+            new SnapshotCopy(handler, handler.getBackupContext().getTableBySnapshot(options[1]));
+        LOG.debug("Doing SNAPSHOT_COPY");
+        // Make a new instance of conf to be used by the snapshot copy class.
+        snapshotCp.setConf(new Configuration(conf));
+        res = snapshotCp.run(options);
+      } else if (copyType == Type.INCREMENTAL) {
+        LOG.debug("Doing COPY_TYPE_DISTCP");
+        setSubTaskPercntgInWholeTask(1f);
+
+        BackupDistCp distcp = new BackupDistCp(new Configuration(conf), null, handler);
+        // Handle a special case where the source file is a single file.
+        // In this case, distcp will not create the target dir. It just take the
+        // target as a file name and copy source file to the target (as a file name).
+        // We need to create the target dir before run distcp.
+        LOG.debug("DistCp options: " + Arrays.toString(options));
+        if (options.length == 2) {
+          Path dest = new Path(options[1]);
+          FileSystem destfs = dest.getFileSystem(conf);
+          if (!destfs.exists(dest)) {
+            destfs.mkdirs(dest);
+          }
+        }
+
+        res = distcp.run(options);
+      }
+      return res;
+
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/de69f0df/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java
new file mode 100644
index 0000000..deefbf7
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.backup.HBackupFileSystem;
+import org.apache.hadoop.hbase.backup.IncrementalRestoreService;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.mapreduce.WALPlayer;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class MapReduceRestoreService implements IncrementalRestoreService {
+  public static final Log LOG = LogFactory.getLog(MapReduceRestoreService.class);
+
+  private WALPlayer player;
+
+  public MapReduceRestoreService() {
+    this.player = new WALPlayer();
+  }
+
+  @Override
+  public void run(String logDir, String[] tableNames, String[] newTableNames) throws IOException {
+    String tableStr = HBackupFileSystem.join(tableNames);
+    String newTableStr = HBackupFileSystem.join(newTableNames);
+
+    // WALPlayer reads all files in arbitrary directory structure and creates a Map task for each
+    // log file
+
+    String[] playerArgs = { logDir, tableStr, newTableStr };
+    LOG.info("Restore incremental backup from directory " + logDir + " from hbase tables "
+        + HBackupFileSystem.join(tableNames) + " to tables "
+        + HBackupFileSystem.join(newTableNames));
+    try {
+      player.run(playerArgs);
+    } catch (Exception e) {
+      throw new IOException("cannot restore from backup directory " + logDir
+        + " (check Hadoop and HBase logs) " + e);
+    }
+  }
+
+  @Override
+  public Configuration getConf() {
+    return player.getConf();
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.player.setConf(conf);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/de69f0df/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java
new file mode 100644
index 0000000..4712548
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java
@@ -0,0 +1,121 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.master;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.backup.BackupSystemTable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
+
+
+
+
+/**
+ * Implementation of a log cleaner that checks if a log is still scheduled for
+ * incremental backup before deleting it when its TTL is over.
+ */
+@InterfaceStability.Evolving
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class BackupLogCleaner extends BaseLogCleanerDelegate {
+  private static final Log LOG = LogFactory.getLog(BackupLogCleaner.class);
+
+  private boolean stopped = false;
+
+  public BackupLogCleaner() {
+  }
+
+  @Override
+  public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
+    // all members of this class are null if backup is disabled,
+    // so we cannot filter the files
+    if (this.getConf() == null) {
+      return files;
+    }
+
+    try {
+      final BackupSystemTable table = BackupSystemTable.getTable(getConf());
+      // If we do not have recorded backup sessions
+      if (table.hasBackupSessions() == false) {
+        return files;
+      }
+      return Iterables.filter(files, new Predicate<FileStatus>() {
+        @Override
+        public boolean apply(FileStatus file) {
+          try {
+            String wal = file.getPath().toString();
+            boolean logInSystemTable = table.checkWALFile(wal);
+            if (LOG.isDebugEnabled()) {
+              if (logInSystemTable) {
+                LOG.debug("Found log file in hbase:backup, deleting: " + wal);
+              } else {
+                LOG.debug("Didn't find this log in hbase:backup, keeping: " + wal);
+              }
+            }
+            return logInSystemTable;
+          } catch (IOException e) {
+            LOG.error(e);
+            return false;// keep file for a while, HBase failed
+          }
+        }
+      });
+    } catch (IOException e) {
+      LOG.error("Failed to get hbase:backup table, therefore will keep all files", e);
+      // nothing to delete
+      return new ArrayList<FileStatus>();
+    }
+
+  }
+
+  @Override
+  public void setConf(Configuration config) {
+    // If backup is disabled, keep all members null
+    if (!config.getBoolean(HConstants.BACKUP_ENABLE_KEY, HConstants.BACKUP_ENABLE_DEFAULT)) {
+      LOG.warn("Backup is disabled - allowing all wals to be deleted");
+      return;
+    }
+    super.setConf(config);
+  }
+
+  @Override
+  public void stop(String why) {
+    if (this.stopped) {
+      return;
+    }
+    this.stopped = true;
+    LOG.info("Stopping BackupLogCleaner");
+  }
+
+  @Override
+  public boolean isStopped() {
+    return this.stopped;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/de69f0df/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java
new file mode 100644
index 0000000..f96682f
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.master;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.MetricsMaster;
+import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
+import org.apache.hadoop.hbase.procedure.Procedure;
+import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
+import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
+import org.apache.zookeeper.KeeperException;
+
+public class LogRollMasterProcedureManager extends MasterProcedureManager {
+
+  public static final String ROLLLOG_PROCEDURE_SIGNATURE = "rolllog-proc";
+  public static final String ROLLLOG_PROCEDURE_NAME = "rolllog";
+  private static final Log LOG = LogFactory.getLog(LogRollMasterProcedureManager.class);
+
+  private MasterServices master;
+  private ProcedureCoordinator coordinator;
+  private boolean done;
+
+  @Override
+  public void stop(String why) {
+    LOG.info("stop: " + why);
+  }
+
+  @Override
+  public boolean isStopped() {
+    return false;
+  }
+
+  @Override
+  public void initialize(MasterServices master, MetricsMaster metricsMaster)
+      throws KeeperException, IOException, UnsupportedOperationException {
+    this.master = master;
+    this.done = false;
+
+    // setup the default procedure coordinator
+    String name = master.getServerName().toString();
+    ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, 1);
+    BaseCoordinatedStateManager coordManager =
+        (BaseCoordinatedStateManager) CoordinatedStateManagerFactory
+        .getCoordinatedStateManager(master.getConfiguration());
+    coordManager.initialize(master);
+
+    ProcedureCoordinatorRpcs comms =
+        coordManager.getProcedureCoordinatorRpcs(getProcedureSignature(), name);
+
+    this.coordinator = new ProcedureCoordinator(comms, tpool);
+  }
+
+  @Override
+  public String getProcedureSignature() {
+    return ROLLLOG_PROCEDURE_SIGNATURE;
+  }
+
+  @Override
+  public void execProcedure(ProcedureDescription desc) throws IOException {
+    this.done = false;
+    // start the process on the RS
+    ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(desc.getInstance());
+    List<ServerName> serverNames = master.getServerManager().getOnlineServersList();
+    List<String> servers = new ArrayList<String>();
+    for (ServerName sn : serverNames) {
+      servers.add(sn.toString());
+    }
+    Procedure proc = coordinator.startProcedure(monitor, desc.getInstance(), new byte[0], servers);
+    if (proc == null) {
+      String msg = "Failed to submit distributed procedure for '" + desc.getInstance() + "'";
+      LOG.error(msg);
+      throw new IOException(msg);
+    }
+
+    try {
+      // wait for the procedure to complete. A timer thread is kicked off that should cancel this
+      // if it takes too long.
+      proc.waitForCompleted();
+      LOG.info("Done waiting - exec procedure for " + desc.getInstance());
+      LOG.info("Distributed roll log procedure is successful!");
+      this.done = true;
+    } catch (InterruptedException e) {
+      ForeignException ee =
+          new ForeignException("Interrupted while waiting for roll log procdure to finish", e);
+      monitor.receive(ee);
+      Thread.currentThread().interrupt();
+    } catch (ForeignException e) {
+      ForeignException ee =
+          new ForeignException("Exception while waiting for roll log procdure to finish", e);
+      monitor.receive(ee);
+    }
+    monitor.rethrowException();
+  }
+
+  @Override
+  public boolean isProcedureDone(ProcedureDescription desc) throws IOException {
+    return done;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/de69f0df/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java
new file mode 100644
index 0000000..618748e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.regionserver;
+
+import java.util.HashMap;
+import java.util.concurrent.Callable;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.backup.BackupSystemTable;
+import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.hadoop.hbase.procedure.ProcedureMember;
+import org.apache.hadoop.hbase.procedure.Subprocedure;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
+
+
+/**
+ * This backup subprocedure implementation forces a log roll on the RS.
+ */
+public class LogRollBackupSubprocedure extends Subprocedure {
+  private static final Log LOG = LogFactory.getLog(LogRollBackupSubprocedure.class);
+
+  private final RegionServerServices rss;
+  private final LogRollBackupSubprocedurePool taskManager;
+  private FSHLog hlog;
+
+  public LogRollBackupSubprocedure(RegionServerServices rss, ProcedureMember member,
+      ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout,
+      LogRollBackupSubprocedurePool taskManager) {
+
+    super(member, LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, errorListener,
+      wakeFrequency, timeout);
+    LOG.info("Constructing a LogRollBackupSubprocedure.");
+    this.rss = rss;
+    this.taskManager = taskManager;
+  }
+
+  /**
+   * Callable task. TODO. We don't need a thread pool to execute roll log. This can be simplified
+   * with no use of subprocedurepool.
+   */
+  class RSRollLogTask implements Callable<Void> {
+    RSRollLogTask() {
+    }
+
+    @Override
+    public Void call() throws Exception {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("++ DRPC started: " + rss.getServerName());
+      }
+      hlog = (FSHLog) rss.getWAL(null);
+      long filenum = hlog.getFilenum();
+
+      LOG.info("Trying to roll log in backup subprocedure, current log number: " + filenum);
+      hlog.rollWriter(true);
+      LOG.info("After roll log in backup subprocedure, current log number: " + hlog.getFilenum());
+      // write the log number to hbase:backup.
+      BackupSystemTable table = BackupSystemTable.getTable(rss.getConfiguration());
+      // sanity check, good for testing
+      HashMap<String, String> serverTimestampMap = table.readRegionServerLastLogRollResult();
+      String host = rss.getServerName().getHostname();
+      String sts = serverTimestampMap.get(host);
+      if (sts != null && Long.parseLong(sts) > filenum) {
+        LOG.warn("Won't update server's last roll log result: current=" + sts + " new=" + filenum);
+        return null;
+      }
+      table.writeRegionServerLastLogRollResult(host, Long.toString(filenum));
+      // TODO: potential leak of HBase connection
+      // BackupSystemTable.close();
+      return null;
+    }
+
+  }
+
+  private void rolllog() throws ForeignException {
+
+    monitor.rethrowException();
+
+    taskManager.submitTask(new RSRollLogTask());
+    monitor.rethrowException();
+
+    // wait for everything to complete.
+    taskManager.waitForOutstandingTasks();
+    monitor.rethrowException();
+
+  }
+
+  @Override
+  public void acquireBarrier() throws ForeignException {
+    // do nothing, executing in inside barrier step.
+  }
+
+  /**
+   * do a log roll.
+   * @return some bytes
+   */
+  @Override
+  public byte[] insideBarrier() throws ForeignException {
+    rolllog();
+    // FIXME
+    return null;
+  }
+
+  /**
+   * Cancel threads if they haven't finished.
+   */
+  @Override
+  public void cleanup(Exception e) {
+    taskManager.abort("Aborting log roll subprocedure tasks for backup due to error", e);
+  }
+
+  /**
+   * Hooray!
+   */
+  public void releaseBarrier() {
+    // NO OP
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/de69f0df/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java
new file mode 100644
index 0000000..1ca638c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.regionserver;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.DaemonThreadFactory;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+
+/**
+ * Handle running each of the individual tasks for completing a backup procedure
+ * on a regionserver.
+ */
+public class LogRollBackupSubprocedurePool implements Closeable, Abortable {
+  private static final Log LOG = LogFactory.getLog(LogRollBackupSubprocedurePool.class);
+
+  /** Maximum number of concurrent snapshot region tasks that can run concurrently */
+  private static final String CONCURENT_BACKUP_TASKS_KEY = "hbase.backup.region.concurrentTasks";
+  private static final int DEFAULT_CONCURRENT_BACKUP_TASKS = 3;
+
+  private final ExecutorCompletionService<Void> taskPool;
+  private final ThreadPoolExecutor executor;
+  private volatile boolean aborted;
+  private final List<Future<Void>> futures = new ArrayList<Future<Void>>();
+  private final String name;
+
+  public LogRollBackupSubprocedurePool(String name, Configuration conf) {
+    // configure the executor service
+    long keepAlive =
+        conf.getLong(LogRollRegionServerProcedureManager.BACKUP_TIMEOUT_MILLIS_KEY,
+          LogRollRegionServerProcedureManager.BACKUP_TIMEOUT_MILLIS_DEFAULT);
+    int threads = conf.getInt(CONCURENT_BACKUP_TASKS_KEY, DEFAULT_CONCURRENT_BACKUP_TASKS);
+    this.name = name;
+    executor =
+        new ThreadPoolExecutor(1, threads, keepAlive, TimeUnit.SECONDS,
+          new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory("rs(" + name
+            + ")-backup-pool"));
+    taskPool = new ExecutorCompletionService<Void>(executor);
+  }
+
+  /**
+   * Submit a task to the pool.
+   */
+  public void submitTask(final Callable<Void> task) {
+    Future<Void> f = this.taskPool.submit(task);
+    futures.add(f);
+  }
+
+  /**
+   * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}
+   * @return <tt>true</tt> on success, <tt>false</tt> otherwise
+   * @throws ForeignException exception
+   */
+  public boolean waitForOutstandingTasks() throws ForeignException {
+    LOG.debug("Waiting for backup procedure to finish.");
+
+    try {
+      for (Future<Void> f : futures) {
+        f.get();
+      }
+      return true;
+    } catch (InterruptedException e) {
+      if (aborted) {
+        throw new ForeignException("Interrupted and found to be aborted while waiting for tasks!",
+            e);
+      }
+      Thread.currentThread().interrupt();
+    } catch (ExecutionException e) {
+      if (e.getCause() instanceof ForeignException) {
+        throw (ForeignException) e.getCause();
+      }
+      throw new ForeignException(name, e.getCause());
+    } finally {
+      // close off remaining tasks
+      for (Future<Void> f : futures) {
+        if (!f.isDone()) {
+          f.cancel(true);
+        }
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Attempt to cleanly shutdown any running tasks - allows currently running tasks to cleanly
+   * finish
+   */
+  @Override
+  public void close() {
+    executor.shutdown();
+  }
+
+  @Override
+  public void abort(String why, Throwable e) {
+    if (this.aborted) {
+      return;
+    }
+
+    this.aborted = true;
+    LOG.warn("Aborting because: " + why, e);
+    this.executor.shutdownNow();
+  }
+
+  @Override
+  public boolean isAborted() {
+    return this.aborted;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/de69f0df/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java
new file mode 100644
index 0000000..aca190c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.regionserver;
+
+
+import java.io.IOException;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
+import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
+import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.hadoop.hbase.procedure.ProcedureMember;
+import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
+import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager;
+import org.apache.hadoop.hbase.procedure.Subprocedure;
+import org.apache.hadoop.hbase.procedure.SubprocedureFactory;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+
+/**
+ * This manager class handles the work dealing with backup for a {@link HRegionServer}.
+ * <p>
+ * This provides the mechanism necessary to kick off a backup specific {@link Subprocedure} that is
+ * responsible by this region server. If any failures occur with the subprocedure, the manager's
+ * procedure member notifies the procedure coordinator to abort all others.
+ * <p>
+ * On startup, requires {@link #start()} to be called.
+ * <p>
+ * On shutdown, requires org.apache.hadoop.hbase.procedure.ProcedureMember.close() to be
+ * called
+ */
+public class LogRollRegionServerProcedureManager extends RegionServerProcedureManager {
+
+  private static final Log LOG = LogFactory.getLog(LogRollRegionServerProcedureManager.class);
+
+  /** Conf key for number of request threads to start backup on regionservers */
+  public static final String BACKUP_REQUEST_THREADS_KEY = "hbase.backup.region.pool.threads";
+  /** # of threads for backup work on the rs. */
+  public static final int BACKUP_REQUEST_THREADS_DEFAULT = 10;
+
+  public static final String BACKUP_TIMEOUT_MILLIS_KEY = "hbase.backup.timeout";
+  public static final long BACKUP_TIMEOUT_MILLIS_DEFAULT = 60000;
+
+  /** Conf key for millis between checks to see if backup work completed or if there are errors */
+  public static final String BACKUP_REQUEST_WAKE_MILLIS_KEY = "hbase.backup.region.wakefrequency";
+  /** Default amount of time to check for errors while regions finish backup work */
+  private static final long BACKUP_REQUEST_WAKE_MILLIS_DEFAULT = 500;
+
+  private RegionServerServices rss;
+  private ProcedureMemberRpcs memberRpcs;
+  private ProcedureMember member;
+
+  /**
+   * Create a default backup procedure manager
+   */
+  public LogRollRegionServerProcedureManager() {
+  }
+
+  /**
+   * Start accepting backup procedure requests.
+   */
+  @Override
+  public void start() {
+    this.memberRpcs.start(rss.getServerName().toString(), member);
+    LOG.info("Started region server backup manager.");
+  }
+
+  /**
+   * Close <tt>this</tt> and all running backup procedure tasks
+   * @param force forcefully stop all running tasks
+   * @throws IOException exception
+   */
+  @Override
+  public void stop(boolean force) throws IOException {
+    String mode = force ? "abruptly" : "gracefully";
+    LOG.info("Stopping RegionServerBackupManager " + mode + ".");
+
+    try {
+      this.member.close();
+    } finally {
+      this.memberRpcs.close();
+    }
+  }
+
+  /**
+   * If in a running state, creates the specified subprocedure for handling a backup procedure.
+   * @return Subprocedure to submit to the ProcedureMemeber.
+   */
+  public Subprocedure buildSubprocedure() {
+
+    // don't run a backup if the parent is stop(ping)
+    if (rss.isStopping() || rss.isStopped()) {
+      throw new IllegalStateException("Can't start backup procedure on RS: " + rss.getServerName()
+        + ", because stopping/stopped!");
+    }
+
+    LOG.info("Attempting to run a roll log procedure for backup.");
+    ForeignExceptionDispatcher errorDispatcher = new ForeignExceptionDispatcher();
+    Configuration conf = rss.getConfiguration();
+    long timeoutMillis = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY, BACKUP_TIMEOUT_MILLIS_DEFAULT);
+    long wakeMillis =
+        conf.getLong(BACKUP_REQUEST_WAKE_MILLIS_KEY, BACKUP_REQUEST_WAKE_MILLIS_DEFAULT);
+
+    LogRollBackupSubprocedurePool taskManager =
+        new LogRollBackupSubprocedurePool(rss.getServerName().toString(), conf);
+    return new LogRollBackupSubprocedure(rss, member, errorDispatcher, wakeMillis, timeoutMillis,
+      taskManager);
+
+  }
+
+  /**
+   * Build the actual backup procedure runner that will do all the 'hard' work
+   */
+  public class BackupSubprocedureBuilder implements SubprocedureFactory {
+
+    @Override
+    public Subprocedure buildSubprocedure(String name, byte[] data) {
+      return LogRollRegionServerProcedureManager.this.buildSubprocedure();
+    }
+  }
+
+  @Override
+  public void initialize(RegionServerServices rss) throws IOException {
+    this.rss = rss;
+    BaseCoordinatedStateManager coordManager =
+        (BaseCoordinatedStateManager) CoordinatedStateManagerFactory.getCoordinatedStateManager(rss
+          .getConfiguration());
+    coordManager.initialize(rss);
+    this.memberRpcs =
+        coordManager
+        .getProcedureMemberRpcs(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE);
+
+    // read in the backup handler configuration properties
+    Configuration conf = rss.getConfiguration();
+    long keepAlive = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY, BACKUP_TIMEOUT_MILLIS_DEFAULT);
+    int opThreads = conf.getInt(BACKUP_REQUEST_THREADS_KEY, BACKUP_REQUEST_THREADS_DEFAULT);
+    // create the actual cohort member
+    ThreadPoolExecutor pool =
+        ProcedureMember.defaultPool(rss.getServerName().toString(), opThreads, keepAlive);
+    this.member = new ProcedureMember(memberRpcs, pool, new BackupSubprocedureBuilder());
+  }
+
+  @Override
+  public String getProcedureSignature() {
+    return "backup-proc";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/de69f0df/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
index ae36f08..3342743 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
@@ -17,7 +17,11 @@
  */
 package org.apache.hadoop.hbase.coordination;
 
+import java.io.IOException;
+
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
+import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
 import org.apache.hadoop.hbase.CoordinatedStateManager;
 import org.apache.hadoop.hbase.Server;
 
@@ -51,8 +55,21 @@ public abstract class BaseCoordinatedStateManager implements CoordinatedStateMan
    * Method to retrieve coordination for split log worker
    */
   public abstract  SplitLogWorkerCoordination getSplitLogWorkerCoordination();
+  
   /**
    * Method to retrieve coordination for split log manager
    */
   public abstract SplitLogManagerCoordination getSplitLogManagerCoordination();
+  /**
+   * Method to retrieve {@link org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs}
+   */
+  public abstract ProcedureCoordinatorRpcs
+    getProcedureCoordinatorRpcs(String procType, String coordNode) throws IOException;
+  
+  /**
+   * Method to retrieve {@link org.apache.hadoop.hbase.procedure.ProcedureMemberRpc}
+   */
+  public abstract ProcedureMemberRpcs
+    getProcedureMemberRpcs(String procType) throws IOException;
+    
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/de69f0df/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
index 3e89be7..7cf4aab 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
@@ -17,9 +17,15 @@
  */
 package org.apache.hadoop.hbase.coordination;
 
+import java.io.IOException;
+
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
+import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
+import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinatorRpcs;
+import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 
 /**
@@ -49,9 +55,21 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager {
   @Override
   public SplitLogWorkerCoordination getSplitLogWorkerCoordination() {
     return splitLogWorkerCoordination;
-    }
+  }
+
   @Override
   public SplitLogManagerCoordination getSplitLogManagerCoordination() {
     return splitLogManagerCoordination;
   }
+
+  @Override
+  public ProcedureCoordinatorRpcs getProcedureCoordinatorRpcs(String procType, String coordNode)
+      throws IOException {
+    return new ZKProcedureCoordinatorRpcs(watcher, procType, coordNode);
+  }
+
+  @Override
+  public ProcedureMemberRpcs getProcedureMemberRpcs(String procType) throws IOException {
+    return new ZKProcedureMemberRpcs(watcher, procType);
+  }
 }