You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2016/09/19 20:31:39 UTC

hbase git commit: HBASE-15448 HBase Backup Phase 3: Restore optimization 2 (Vladimir Rodionov)

Repository: hbase
Updated Branches:
  refs/heads/HBASE-7912 fef921860 -> 6d1e7079f


HBASE-15448 HBase Backup Phase 3: Restore optimization 2 (Vladimir Rodionov)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6d1e7079
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6d1e7079
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6d1e7079

Branch: refs/heads/HBASE-7912
Commit: 6d1e7079f7f5eccf426dc0dd5136681bbc8e4d52
Parents: fef9218
Author: tedyu <yu...@gmail.com>
Authored: Mon Sep 19 13:31:19 2016 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Mon Sep 19 13:31:19 2016 -0700

----------------------------------------------------------------------
 .../backup/BackupRestoreServerFactory.java      |  12 +-
 .../hadoop/hbase/backup/HBackupFileSystem.java  |  28 ++
 .../hbase/backup/IncrementalRestoreService.java |  42 --
 .../hadoop/hbase/backup/RestoreService.java     |  50 +++
 .../backup/impl/RestoreTablesProcedure.java     | 402 -------------------
 .../hbase/backup/mapreduce/HFileSplitter.java   | 190 +++++++++
 .../mapreduce/MapReduceRestoreService.java      | 108 ++---
 .../backup/master/FullTableBackupProcedure.java |   1 -
 .../backup/master/RestoreTablesProcedure.java   | 387 ++++++++++++++++++
 .../hbase/backup/util/RestoreServerUtil.java    | 149 ++++---
 .../hbase/mapreduce/HFileInputFormat2.java      | 174 ++++++++
 .../org/apache/hadoop/hbase/master/HMaster.java |   2 +-
 .../hadoop/hbase/backup/TestBackupBase.java     |   2 +-
 .../hbase/backup/TestIncrementalBackup.java     |  34 +-
 14 files changed, 1016 insertions(+), 565 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/6d1e7079/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreServerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreServerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreServerFactory.java
index 25ec9d9..7644a4d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreServerFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreServerFactory.java
@@ -36,15 +36,15 @@ public final class BackupRestoreServerFactory {
   }
   
   /**
-   * Gets incremental restore service
+   * Gets backup restore service
    * @param conf - configuration
-   * @return incremental backup service instance
+   * @return backup restore service instance
    */
-  public static IncrementalRestoreService getIncrementalRestoreService(Configuration conf) {
-    Class<? extends IncrementalRestoreService> cls =
+  public static RestoreService getRestoreService(Configuration conf) {
+    Class<? extends RestoreService> cls =
         conf.getClass(HBASE_INCR_RESTORE_IMPL_CLASS, MapReduceRestoreService.class,
-          IncrementalRestoreService.class);
-    IncrementalRestoreService service =  ReflectionUtils.newInstance(cls, conf);
+          RestoreService.class);
+    RestoreService service =  ReflectionUtils.newInstance(cls, conf);
     service.setConf(conf);
     return service;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/6d1e7079/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java
index 1fc0a92..a130a9b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java
@@ -20,18 +20,25 @@
 package org.apache.hadoop.hbase.backup;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.backup.impl.BackupManifest;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
 
 /**
  * View to an on-disk Backup Image FileSytem
@@ -77,6 +84,27 @@ public class HBackupFileSystem {
     return new Path(getTableBackupDir(backupRootPath.toString(), backupId, tableName));
   }
 
+  
+  public static List<HRegionInfo> loadRegionInfos(TableName tableName, 
+    Path backupRootPath, String backupId, Configuration conf) throws IOException
+  {
+    Path backupTableRoot = getTableBackupPath(tableName, backupRootPath, backupId);
+    FileSystem fs = backupTableRoot.getFileSystem(conf);
+    RemoteIterator<LocatedFileStatus> it = fs.listFiles(backupTableRoot, true);
+    List<HRegionInfo> infos = new ArrayList<HRegionInfo>();
+    while(it.hasNext()) {
+      LocatedFileStatus lfs = it.next();
+      if(lfs.isFile() && lfs.getPath().toString().endsWith(HRegionFileSystem.REGION_INFO_FILE)) {
+        Path regionDir = lfs.getPath().getParent();
+        HRegionInfo info = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
+        infos.add(info);
+      }
+    }
+    
+    Collections.sort(infos);
+    return infos;
+  }
+  
   /**
    * Given the backup root dir and the backup id, return the log file location for an incremental
    * backup.

http://git-wip-us.apache.org/repos/asf/hbase/blob/6d1e7079/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/IncrementalRestoreService.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/IncrementalRestoreService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/IncrementalRestoreService.java
deleted file mode 100644
index ae48480..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/IncrementalRestoreService.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.backup;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public interface IncrementalRestoreService extends Configurable{
-
-  /**
-   * Run restore operation
-   * @param logDirectoryPaths - path array of WAL log directories
-   * @param fromTables - from tables
-   * @param toTables - to tables
-   * @throws IOException
-   */
-  public void run(Path[] logDirectoryPaths, TableName[] fromTables, TableName[] toTables)
-    throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/6d1e7079/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreService.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreService.java
new file mode 100644
index 0000000..2da98c2
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreService.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+
+/**
+ * Backup restore service interface
+ * Concrete implementation is provided by backup provider.
+ */
+
+public interface RestoreService extends Configurable{
+
+  /**
+   * Run restore operation
+   * @param dirPaths - path array of WAL log directories
+   * @param fromTables - from tables
+   * @param toTables - to tables
+   * @param fullBackupRestore - full backup restore
+   * @throws IOException
+   */
+  public void run(Path[] dirPaths, TableName[] fromTables, 
+      TableName[] toTables, boolean fullBackupRestore)
+    throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/6d1e7079/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesProcedure.java
deleted file mode 100644
index 7ac11de..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesProcedure.java
+++ /dev/null
@@ -1,402 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.backup.impl;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.backup.BackupType;
-import org.apache.hadoop.hbase.backup.HBackupFileSystem;
-import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
-import org.apache.hadoop.hbase.backup.util.RestoreServerUtil;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.TableState;
-import org.apache.hadoop.hbase.master.MasterServices;
-import org.apache.hadoop.hbase.master.TableStateManager;
-import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
-import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface;
-import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreTablesState;
-import org.apache.hadoop.security.UserGroupInformation;
-
-@InterfaceAudience.Private
-public class RestoreTablesProcedure
-    extends StateMachineProcedure<MasterProcedureEnv, RestoreTablesState>
-    implements TableProcedureInterface {
-  private static final Log LOG = LogFactory.getLog(RestoreTablesProcedure.class);
-
-  private final AtomicBoolean aborted = new AtomicBoolean(false);
-  private Configuration conf;
-  private String backupId;
-  private List<TableName> sTableList;
-  private List<TableName> tTableList;
-  private String targetRootDir;
-  private boolean isOverwrite;
-
-  public RestoreTablesProcedure() {
-    // Required by the Procedure framework to create the procedure on replay
-  }
-
-  public RestoreTablesProcedure(final MasterProcedureEnv env,
-      final String targetRootDir, String backupId, List<TableName> sTableList,
-      List<TableName> tTableList, boolean isOverwrite) throws IOException {
-    this.targetRootDir = targetRootDir;
-    this.backupId = backupId;
-    this.sTableList = sTableList;
-    this.tTableList = tTableList;
-    if (tTableList == null || tTableList.isEmpty()) {
-      this.tTableList = sTableList;
-    }
-    this.isOverwrite = isOverwrite;
-    this.setOwner(env.getRequestUser().getUGI().getShortUserName());
-  }
-
-  @Override
-  public byte[] getResult() {
-    return null;
-  }
-
-  /**
-   * Validate target Tables
-   * @param conn connection
-   * @param mgr table state manager
-   * @param tTableArray: target tables
-   * @param isOverwrite overwrite existing table
-   * @throws IOException exception
-   */
-  private void checkTargetTables(Connection conn, TableStateManager mgr, TableName[] tTableArray,
-      boolean isOverwrite)
-      throws IOException {
-    ArrayList<TableName> existTableList = new ArrayList<>();
-    ArrayList<TableName> disabledTableList = new ArrayList<>();
-
-    // check if the tables already exist
-    for (TableName tableName : tTableArray) {
-      if (MetaTableAccessor.tableExists(conn, tableName)) {
-        existTableList.add(tableName);
-        if (mgr.isTableState(tableName, TableState.State.DISABLED, TableState.State.DISABLING)) {
-          disabledTableList.add(tableName);
-        }
-      } else {
-        LOG.info("HBase table " + tableName
-            + " does not exist. It will be created during restore process");
-      }
-    }
-
-    if (existTableList.size() > 0) {
-      if (!isOverwrite) {
-        LOG.error("Existing table (" + existTableList + ") found in the restore target, please add "
-          + "\"-overwrite\" option in the command if you mean to restore to these existing tables");
-        throw new IOException("Existing table found in target while no \"-overwrite\" "
-            + "option found");
-      } else {
-        if (disabledTableList.size() > 0) {
-          LOG.error("Found offline table in the restore target, "
-              + "please enable them before restore with \"-overwrite\" option");
-          LOG.info("Offline table list in restore target: " + disabledTableList);
-          throw new IOException(
-              "Found offline table in the target when restore with \"-overwrite\" option");
-        }
-      }
-    }
-  }
-
-  /**
-   * Restore operation handle each backupImage in iterator
-   * @param conn the Connection
-   * @param it: backupImage iterator - ascending
-   * @param sTable: table to be restored
-   * @param tTable: table to be restored to
-   * @param truncateIfExists truncate table if it exists
-   * @throws IOException exception
-   */
-  private void restoreImages(MasterServices svc, Iterator<BackupImage> it, TableName sTable,
-      TableName tTable, boolean truncateIfExists) throws IOException {
-
-    // First image MUST be image of a FULL backup
-    BackupImage image = it.next();
-
-    String rootDir = image.getRootDir();
-    String backupId = image.getBackupId();
-    Path backupRoot = new Path(rootDir);
-
-    // We need hFS only for full restore (see the code)
-    RestoreServerUtil restoreTool = new RestoreServerUtil(conf, backupRoot, backupId);
-    BackupManifest manifest = HBackupFileSystem.getManifest(sTable, conf, backupRoot, backupId);
-
-    Path tableBackupPath = HBackupFileSystem.getTableBackupPath(sTable, backupRoot, backupId);
-
-    // TODO: convert feature will be provided in a future JIRA
-    boolean converted = false;
-    String lastIncrBackupId = null;
-    List<String> logDirList = null;
-
-    // Scan incremental backups
-    if (it.hasNext()) {
-      // obtain the backupId for most recent incremental
-      logDirList = new ArrayList<String>();
-      while (it.hasNext()) {
-        BackupImage im = it.next();
-        String logBackupDir = HBackupFileSystem.getLogBackupDir(im.getRootDir(), im.getBackupId());
-        logDirList.add(logBackupDir);
-        lastIncrBackupId = im.getBackupId();
-      }
-    }
-    if (manifest.getType() == BackupType.FULL || converted) {
-      LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from "
-          + (converted ? "converted" : "full") + " backup image " + tableBackupPath.toString());
-      restoreTool.fullRestoreTable(svc, tableBackupPath, sTable, tTable,
-        converted, truncateIfExists, lastIncrBackupId);
-    } else { // incremental Backup
-      throw new IOException("Unexpected backup type " + image.getType());
-    }
-
-    // The rest one are incremental
-    if (logDirList != null) {
-      String logDirs = StringUtils.join(logDirList, ",");
-      LOG.info("Restoring '" + sTable + "' to '" + tTable
-          + "' from log dirs: " + logDirs);
-      String[] sarr = new String[logDirList.size()];
-      logDirList.toArray(sarr);
-      Path[] paths = org.apache.hadoop.util.StringUtils.stringToPath(sarr);
-      restoreTool.incrementalRestoreTable(svc, tableBackupPath, paths,
-          new TableName[] { sTable }, new TableName[] { tTable }, lastIncrBackupId);
-    }
-    LOG.info(sTable + " has been successfully restored to " + tTable);
-  }
-
-  /**
-   * Restore operation. Stage 2: resolved Backup Image dependency
-   * @param svc MasterServices
-   * @param backupManifestMap : tableName,  Manifest
-   * @param sTableArray The array of tables to be restored
-   * @param tTableArray The array of mapping tables to restore to
-   * @param isOverwrite overwrite
-   * @return set of BackupImages restored
-   * @throws IOException exception
-   */
-  private void restoreStage(MasterServices svc, HashMap<TableName, BackupManifest> backupManifestMap,
-      TableName[] sTableArray, TableName[] tTableArray, boolean isOverwrite) throws IOException {
-    TreeSet<BackupImage> restoreImageSet = new TreeSet<BackupImage>();
-    boolean truncateIfExists = isOverwrite;
-    try {
-      for (int i = 0; i < sTableArray.length; i++) {
-        TableName table = sTableArray[i];
-        BackupManifest manifest = backupManifestMap.get(table);
-        // Get the image list of this backup for restore in time order from old
-        // to new.
-        List<BackupImage> list = new ArrayList<BackupImage>();
-        list.add(manifest.getBackupImage());
-        List<BackupImage> depList = manifest.getDependentListByTable(table);
-        list.addAll(depList);
-        TreeSet<BackupImage> restoreList = new TreeSet<BackupImage>(list);
-        LOG.debug("need to clear merged Image. to be implemented in future jira");
-        restoreImages(svc, restoreList.iterator(), table, tTableArray[i], truncateIfExists);
-        restoreImageSet.addAll(restoreList);
-
-        if (restoreImageSet != null && !restoreImageSet.isEmpty()) {
-          LOG.info("Restore includes the following image(s):");
-          for (BackupImage image : restoreImageSet) {
-            LOG.info("Backup: "
-                + image.getBackupId()
-                + " "
-                + HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(),
-                  table));
-          }
-        }
-      }
-    } catch (Exception e) {
-      LOG.error("Failed", e);
-      throw new IOException(e);
-    }
-    LOG.debug("restoreStage finished");
-  }
-
-  @Override
-  protected Flow executeFromState(final MasterProcedureEnv env, final RestoreTablesState state)
-      throws InterruptedException {
-    if (conf == null) {
-      conf = env.getMasterConfiguration();
-    }
-    if (LOG.isTraceEnabled()) {
-      LOG.trace(this + " execute state=" + state);
-    }
-    TableName[] tTableArray = tTableList.toArray(new TableName[tTableList.size()]);
-    try {
-      switch (state) {
-        case VALIDATION:
-
-          // check the target tables
-          checkTargetTables(env.getMasterServices().getConnection(),
-              env.getMasterServices().getTableStateManager(), tTableArray, isOverwrite);
-
-          setNextState(RestoreTablesState.RESTORE_IMAGES);
-          break;
-        case RESTORE_IMAGES:
-          TableName[] sTableArray = sTableList.toArray(new TableName[sTableList.size()]);
-          HashMap<TableName, BackupManifest> backupManifestMap = new HashMap<>();
-          // check and load backup image manifest for the tables
-          Path rootPath = new Path(targetRootDir);
-          HBackupFileSystem.checkImageManifestExist(backupManifestMap, sTableArray, conf, rootPath,
-            backupId);
-          restoreStage(env.getMasterServices(), backupManifestMap, sTableArray,
-              tTableArray, isOverwrite);
-
-          return Flow.NO_MORE_STATE;
-
-        default:
-          throw new UnsupportedOperationException("unhandled state=" + state);
-      }
-    } catch (IOException e) {
-      setFailure("restore-table", e);
-    }
-    return Flow.HAS_MORE_STATE;
-  }
-
-  @Override
-  protected void rollbackState(final MasterProcedureEnv env, final RestoreTablesState state)
-      throws IOException {
-  }
-
-  @Override
-  protected RestoreTablesState getState(final int stateId) {
-    return RestoreTablesState.valueOf(stateId);
-  }
-
-  @Override
-  protected int getStateId(final RestoreTablesState state) {
-    return state.getNumber();
-  }
-
-  @Override
-  protected RestoreTablesState getInitialState() {
-    return RestoreTablesState.VALIDATION;
-  }
-
-  @Override
-  protected void setNextState(final RestoreTablesState state) {
-    if (aborted.get()) {
-      setAbortFailure("snapshot-table", "abort requested");
-    } else {
-      super.setNextState(state);
-    }
-  }
-
-  @Override
-  public boolean abort(final MasterProcedureEnv env) {
-    aborted.set(true);
-    return true;
-  }
-
-  @Override
-  public void toStringClassDetails(StringBuilder sb) {
-    sb.append(getClass().getSimpleName());
-    sb.append(" (targetRootDir=");
-    sb.append(targetRootDir);
-    sb.append(" isOverwrite= ");
-    sb.append(isOverwrite);
-    sb.append(" backupId= ");
-    sb.append(backupId);
-    sb.append(")");
-  }
-
-  MasterProtos.RestoreTablesRequest toRestoreTables() {
-    MasterProtos.RestoreTablesRequest.Builder bldr = MasterProtos.RestoreTablesRequest.newBuilder();
-    bldr.setOverwrite(isOverwrite).setBackupId(backupId);
-    bldr.setBackupRootDir(targetRootDir);
-    for (TableName table : sTableList) {
-      bldr.addTables(ProtobufUtil.toProtoTableName(table));
-    }
-    for (TableName table : tTableList) {
-      bldr.addTargetTables(ProtobufUtil.toProtoTableName(table));
-    }
-    return bldr.build();
-  }
-
-  @Override
-  public void serializeStateData(final OutputStream stream) throws IOException {
-    super.serializeStateData(stream);
-
-    MasterProtos.RestoreTablesRequest restoreTables = toRestoreTables();
-    restoreTables.writeDelimitedTo(stream);
-  }
-
-  @Override
-  public void deserializeStateData(final InputStream stream) throws IOException {
-    super.deserializeStateData(stream);
-
-    MasterProtos.RestoreTablesRequest proto =
-        MasterProtos.RestoreTablesRequest.parseDelimitedFrom(stream);
-    backupId = proto.getBackupId();
-    targetRootDir = proto.getBackupRootDir();
-    isOverwrite = proto.getOverwrite();
-    sTableList = new ArrayList<>(proto.getTablesList().size());
-    for (HBaseProtos.TableName table : proto.getTablesList()) {
-      sTableList.add(ProtobufUtil.toTableName(table));
-    }
-    tTableList = new ArrayList<>(proto.getTargetTablesList().size());
-    for (HBaseProtos.TableName table : proto.getTargetTablesList()) {
-      tTableList.add(ProtobufUtil.toTableName(table));
-    }
-  }
-
-  @Override
-  public TableName getTableName() {
-    return TableName.BACKUP_TABLE_NAME;
-  }
-
-  @Override
-  public TableOperationType getTableOperationType() {
-    return TableOperationType.RESTORE;
-  }
-
-  @Override
-  protected boolean acquireLock(final MasterProcedureEnv env) {
-    if (env.waitInitialized(this)) {
-      return false;
-    }
-    return env.getProcedureQueue().tryAcquireTableExclusiveLock(this, TableName.BACKUP_TABLE_NAME);
-  }
-
-  @Override
-  protected void releaseLock(final MasterProcedureEnv env) {
-    env.getProcedureQueue().releaseTableExclusiveLock(this, TableName.BACKUP_TABLE_NAME);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/6d1e7079/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitter.java
new file mode 100644
index 0000000..c69a335
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitter.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileInputFormat2;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
+import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * A tool to split HFiles into new region boundaries as a M/R job.
+ * The tool generates HFiles for later bulk importing,
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class HFileSplitter extends Configured implements Tool {
+  private static final Log LOG = LogFactory.getLog(HFileSplitter.class);
+  final static String NAME = "HFileSplitter";
+  public final static String BULK_OUTPUT_CONF_KEY = "hfile.bulk.output";
+  public final static String TABLES_KEY = "hfile.input.tables";
+  public final static String TABLE_MAP_KEY = "hfile.input.tablesmap";
+  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
+
+  public HFileSplitter(){
+  }
+
+  protected HFileSplitter(final Configuration c) {
+    super(c);
+  }
+
+  /**
+   * A mapper that just writes out cells.
+   * This one can be used together with {@link KeyValueSortReducer}
+   */
+  static class HFileCellMapper
+    extends Mapper<NullWritable, KeyValue, ImmutableBytesWritable, KeyValue> {
+
+    @Override
+    public void map(NullWritable key, KeyValue value, Context context) throws IOException,
+        InterruptedException {
+      // Convert value to KeyValue if subclass
+      if (!value.getClass().equals(KeyValue.class)) {
+        value =
+            new KeyValue(value.getRowArray(), value.getRowOffset(), (int) value.getRowLength(),
+                value.getFamilyArray(), value.getFamilyOffset(), (int) value.getFamilyLength(),
+                value.getQualifierArray(), value.getQualifierOffset(),
+                (int) value.getQualifierLength(), value.getTimestamp(), Type.codeToType(value
+                    .getTypeByte()), value.getValueArray(), value.getValueOffset(),
+                value.getValueLength());
+      }
+      context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)), value);
+    }
+
+    @Override
+    public void setup(Context context) throws IOException {
+      // do nothing
+    }
+  }
+
+
+
+  /**
+   * Sets up the actual job.
+   *
+   * @param args  The command line parameters.
+   * @return The newly created job.
+   * @throws IOException When setting up the job fails.
+   */
+  public Job createSubmittableJob(String[] args) throws IOException {
+    Configuration conf = getConf();
+    String inputDirs = args[0];
+    String tabName = args[1];
+    conf.setStrings(TABLES_KEY, tabName);
+    Job job =
+        Job.getInstance(conf,
+          conf.get(JOB_NAME_CONF_KEY, NAME + "_" + EnvironmentEdgeManager.currentTime()));
+    job.setJarByClass(HFileSplitter.class);
+    FileInputFormat.addInputPaths(job, inputDirs);
+    job.setInputFormatClass(HFileInputFormat2.class);
+    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+    String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
+    if (hfileOutPath != null) {
+      LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);
+      TableName tableName = TableName.valueOf(tabName);
+      job.setMapperClass(HFileCellMapper.class);
+      job.setReducerClass(KeyValueSortReducer.class);
+      Path outputDir = new Path(hfileOutPath);
+      FileOutputFormat.setOutputPath(job, outputDir);
+      job.setMapOutputValueClass(KeyValue.class);
+      try (Connection conn = ConnectionFactory.createConnection(conf);
+          Table table = conn.getTable(tableName);
+          RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
+        HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
+      }
+      LOG.debug("success configuring load incremental job");
+
+      TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
+        com.google.common.base.Preconditions.class);
+    } else {
+      throw new IOException("No bulk output directory specified");
+    }
+    return job;
+  }
+
+ 
+  /**
+   * Print usage
+   * @param errorMsg Error message.  Can be null.
+   */
+  private void usage(final String errorMsg) {
+    if (errorMsg != null && errorMsg.length() > 0) {
+      System.err.println("ERROR: " + errorMsg);
+    }
+    System.err.println("Usage: " + NAME + " [options] <HFile inputdir(s)> <table>");
+    System.err.println("Read all HFile's for <table> and split them to <table> region boundaries.");
+    System.err.println("<table>  table to load.\n");
+    System.err.println("To generate HFiles for a bulk data load, pass the option:");
+    System.err.println("  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
+    System.err.println("Other options:");
+    System.err.println("   -D " + JOB_NAME_CONF_KEY
+        + "=jobName - use the specified mapreduce job name for the HFile splitter");
+    System.err.println("For performance also consider the following options:\n"
+        + "  -Dmapreduce.map.speculative=false\n" + "  -Dmapreduce.reduce.speculative=false");
+  }
+
+  /**
+   * Main entry point.
+   *
+   * @param args  The command line parameters.
+   * @throws Exception When running the job fails.
+   */
+  public static void main(String[] args) throws Exception {
+    int ret = ToolRunner.run(new HFileSplitter(HBaseConfiguration.create()), args);
+    System.exit(ret);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    if (args.length < 2) {
+      usage("Wrong number of arguments: " + args.length);
+      System.exit(-1);
+    }
+    Job job = createSubmittableJob(args);
+    int result =job.waitForCompletion(true) ? 0 : 1;
+    return result; 
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hbase/blob/6d1e7079/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java
index c47d6ed..18c1f86 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java
@@ -1,13 +1,13 @@
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
+ * or more contributor license agreements. See the NOTICE file
  * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
+ * regarding copyright ownership. The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * with the License. You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -24,54 +24,67 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.backup.IncrementalRestoreService;
+import org.apache.hadoop.hbase.backup.RestoreService;
 import org.apache.hadoop.hbase.backup.util.BackupServerUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.mapreduce.WALPlayer;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.util.Tool;
 
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public class MapReduceRestoreService implements IncrementalRestoreService {
+public class MapReduceRestoreService implements RestoreService {
   public static final Log LOG = LogFactory.getLog(MapReduceRestoreService.class);
 
-  private WALPlayer player;
+  private Tool player;
+  private Configuration conf;
 
   public MapReduceRestoreService() {
-    this.player = new WALPlayer();
   }
 
   @Override
-  public void run(Path[] logDirPaths, TableName[] tableNames, TableName[] newTableNames)
-      throws IOException {
+  public void run(Path[] dirPaths, TableName[] tableNames, TableName[] newTableNames,
+      boolean fullBackupRestore) throws IOException {
 
-    // WALPlayer reads all files in arbitrary directory structure and creates a Map task for each
-    // log file
-    String logDirs = StringUtils.join(logDirPaths, ",");
-    LOG.info("Restore incremental backup from directory " + logDirs + " from hbase tables "
-        + BackupServerUtil.join(tableNames) + " to tables " + BackupServerUtil.join(newTableNames));
+    String bulkOutputConfKey;
+
+    if (fullBackupRestore) {
+      player = new HFileSplitter();
+      bulkOutputConfKey = HFileSplitter.BULK_OUTPUT_CONF_KEY;
+    } else {
+      player = new WALPlayer();
+      bulkOutputConfKey = WALPlayer.BULK_OUTPUT_CONF_KEY;
+    }
+    // Player reads all files in arbitrary directory structure and creates 
+    // a Map task for each file
+    String dirs = StringUtils.join(dirPaths, ",");
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Restore " + (fullBackupRestore ? "full" : "incremental")
+          + " backup from directory " + dirs + " from hbase tables "
+          + BackupServerUtil.join(tableNames) + " to tables "
+          + BackupServerUtil.join(newTableNames));
+    }
 
     for (int i = 0; i < tableNames.length; i++) {
-      
-      LOG.info("Restore "+ tableNames[i] + " into "+ newTableNames[i]);
-      
+
+      LOG.info("Restore " + tableNames[i] + " into " + newTableNames[i]);
+
       Path bulkOutputPath = getBulkOutputDir(getFileNameCompatibleString(newTableNames[i]));
-      String[] playerArgs =
-        { logDirs, tableNames[i].getNameAsString() };
+      Configuration conf = getConf();
+      conf.set(bulkOutputConfKey, bulkOutputPath.toString());
+      String[] playerArgs = { dirs, tableNames[i].getNameAsString() };
 
       int result = 0;
       int loaderResult = 0;
       try {
-        Configuration conf = getConf();
-        conf.set(WALPlayer.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
-        player.setConf(getConf());        
+
+        player.setConf(getConf());
         result = player.run(playerArgs);
         if (succeeded(result)) {
           // do bulk load
@@ -81,38 +94,37 @@ public class MapReduceRestoreService implements IncrementalRestoreService {
           }
           String[] args = { bulkOutputPath.toString(), newTableNames[i].getNameAsString() };
           loaderResult = loader.run(args);
-          if(failed(loaderResult)) {
-            throw new IOException("Can not restore from backup directory " + logDirs
-              + " (check Hadoop and HBase logs). Bulk loader return code =" + loaderResult);
+
+          if (failed(loaderResult)) {
+            throw new IOException("Can not restore from backup directory " + dirs
+                + " (check Hadoop and HBase logs). Bulk loader return code =" + loaderResult);
           }
         } else {
-            throw new IOException("Can not restore from backup directory " + logDirs
-                + " (check Hadoop/MR and HBase logs). WALPlayer return code =" + result);
+          throw new IOException("Can not restore from backup directory " + dirs
+              + " (check Hadoop/MR and HBase logs). Player return code =" + result);
         }
         LOG.debug("Restore Job finished:" + result);
       } catch (Exception e) {
-        throw new IOException("Can not restore from backup directory " + logDirs
+        throw new IOException("Can not restore from backup directory " + dirs
             + " (check Hadoop and HBase logs) ", e);
       }
 
     }
   }
 
-  private String getFileNameCompatibleString(TableName table)
-  {
-    return table.getNamespaceAsString() +"-"+ table.getQualifierAsString();
+  private String getFileNameCompatibleString(TableName table) {
+    return table.getNamespaceAsString() + "-" + table.getQualifierAsString();
   }
-  
+
   private boolean failed(int result) {
     return result != 0;
   }
-  
+
   private boolean succeeded(int result) {
     return result == 0;
   }
 
-  private LoadIncrementalHFiles createLoader()
-      throws IOException {
+  private LoadIncrementalHFiles createLoader() throws IOException {
     // set configuration for restore:
     // LoadIncrementalHFile needs more time
     // <name>hbase.rpc.timeout</name> <value>600000</value>
@@ -120,10 +132,11 @@ public class MapReduceRestoreService implements IncrementalRestoreService {
     Integer milliSecInHour = 3600000;
     Configuration conf = new Configuration(getConf());
     conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, milliSecInHour);
-    
+
     // By default, it is 32 and loader will fail if # of files in any region exceed this
     // limit. Bad for snapshot restore.
     conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE);
+    conf.set(LoadIncrementalHFiles.IGNORE_UNMATCHED_CF_CONF_KEY, "yes");
     LoadIncrementalHFiles loader = null;
     try {
       loader = new LoadIncrementalHFiles(conf);
@@ -133,27 +146,26 @@ public class MapReduceRestoreService implements IncrementalRestoreService {
     return loader;
   }
 
-  private Path getBulkOutputDir(String tableName) throws IOException
-  {
+  private Path getBulkOutputDir(String tableName) throws IOException {
     Configuration conf = getConf();
     FileSystem fs = FileSystem.get(conf);
-    String tmp = conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
-        HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
-    Path path =  new Path(tmp + Path.SEPARATOR + "bulk_output-"+tableName + "-"
-        + EnvironmentEdgeManager.currentTime());
+    String tmp =
+        conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
+    Path path =
+        new Path(tmp + Path.SEPARATOR + "bulk_output-" + tableName + "-"
+            + EnvironmentEdgeManager.currentTime());
     fs.deleteOnExit(path);
     return path;
   }
-  
 
   @Override
   public Configuration getConf() {
-    return player.getConf();
+    return conf;
   }
 
   @Override
   public void setConf(Configuration conf) {
-    this.player.setConf(conf);
+    this.conf = conf;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/6d1e7079/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/FullTableBackupProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/FullTableBackupProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/FullTableBackupProcedure.java
index c56aaf3..94e991f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/FullTableBackupProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/FullTableBackupProcedure.java
@@ -65,7 +65,6 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescriptio
 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.security.UserGroupInformation;
 
 @InterfaceAudience.Private
 public class FullTableBackupProcedure

http://git-wip-us.apache.org/repos/asf/hbase/blob/6d1e7079/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/RestoreTablesProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/RestoreTablesProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/RestoreTablesProcedure.java
new file mode 100644
index 0000000..2678278
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/RestoreTablesProcedure.java
@@ -0,0 +1,387 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.master;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupType;
+import org.apache.hadoop.hbase.backup.HBackupFileSystem;
+import org.apache.hadoop.hbase.backup.impl.BackupManifest;
+import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
+import org.apache.hadoop.hbase.backup.util.RestoreServerUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.TableState;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.TableStateManager;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface;
+import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreTablesState;
+
+@InterfaceAudience.Private
+public class RestoreTablesProcedure
+    extends StateMachineProcedure<MasterProcedureEnv, RestoreTablesState>
+    implements TableProcedureInterface {
+  private static final Log LOG = LogFactory.getLog(RestoreTablesProcedure.class);
+
+  private final AtomicBoolean aborted = new AtomicBoolean(false);
+  private Configuration conf;
+  private String backupId;
+  private List<TableName> sTableList;
+  private List<TableName> tTableList;
+  private String targetRootDir;
+  private boolean isOverwrite;
+
+  public RestoreTablesProcedure() {
+    // Required by the Procedure framework to create the procedure on replay
+  }
+
+  public RestoreTablesProcedure(final MasterProcedureEnv env,
+      final String targetRootDir, String backupId, List<TableName> sTableList,
+      List<TableName> tTableList, boolean isOverwrite) throws IOException {
+    this.targetRootDir = targetRootDir;
+    this.backupId = backupId;
+    this.sTableList = sTableList;
+    this.tTableList = tTableList;
+    if (tTableList == null || tTableList.isEmpty()) {
+      this.tTableList = sTableList;
+    }
+    this.isOverwrite = isOverwrite;
+    this.setOwner(env.getRequestUser().getUGI().getShortUserName());
+  }
+
+  @Override
+  public byte[] getResult() {
+    return null;
+  }
+
+  /**
+   * Validate target Tables
+   * @param conn connection
+   * @param mgr table state manager
+   * @param tTableArray: target tables
+   * @param isOverwrite overwrite existing table
+   * @throws IOException exception
+   */
+  private void checkTargetTables(Connection conn, TableStateManager mgr, TableName[] tTableArray,
+      boolean isOverwrite)
+      throws IOException {
+    ArrayList<TableName> existTableList = new ArrayList<>();
+    ArrayList<TableName> disabledTableList = new ArrayList<>();
+
+    // check if the tables already exist
+    for (TableName tableName : tTableArray) {
+      if (MetaTableAccessor.tableExists(conn, tableName)) {
+        existTableList.add(tableName);
+        if (mgr.isTableState(tableName, TableState.State.DISABLED, TableState.State.DISABLING)) {
+          disabledTableList.add(tableName);
+        }
+      } else {
+        LOG.info("HBase table " + tableName
+            + " does not exist. It will be created during restore process");
+      }
+    }
+
+    if (existTableList.size() > 0) {
+      if (!isOverwrite) {
+        LOG.error("Existing table (" + existTableList + ") found in the restore target, please add "
+          + "\"-overwrite\" option in the command if you mean to restore to these existing tables");
+        throw new IOException("Existing table found in target while no \"-overwrite\" "
+            + "option found");
+      } else {
+        if (disabledTableList.size() > 0) {
+          LOG.error("Found offline table in the restore target, "
+              + "please enable them before restore with \"-overwrite\" option");
+          LOG.info("Offline table list in restore target: " + disabledTableList);
+          throw new IOException(
+              "Found offline table in the target when restore with \"-overwrite\" option");
+        }
+      }
+    }
+  }
+
+  /**
+   * Restore operation handle each backupImage in array
+   * @param svc: master services
+   * @param images: array BackupImage
+   * @param sTable: table to be restored
+   * @param tTable: table to be restored to
+   * @param truncateIfExists: truncate table
+   * @throws IOException exception
+   */
+
+  private void restoreImages(MasterServices svc, BackupImage[] images, TableName sTable, TableName tTable,
+      boolean truncateIfExists) throws IOException {
+
+    // First image MUST be image of a FULL backup
+    BackupImage image = images[0];
+    String rootDir = image.getRootDir();
+    String backupId = image.getBackupId();
+    Path backupRoot = new Path(rootDir);
+    RestoreServerUtil restoreTool = new RestoreServerUtil(conf, backupRoot, backupId);
+    Path tableBackupPath = HBackupFileSystem.getTableBackupPath(sTable, backupRoot, backupId);
+    String lastIncrBackupId = images.length == 1 ? null : images[images.length - 1].getBackupId();
+    // We need hFS only for full restore (see the code)
+    BackupManifest manifest = HBackupFileSystem.getManifest(sTable, conf, backupRoot, backupId);
+    if (manifest.getType() == BackupType.FULL) {
+      LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from full"
+          + " backup image " + tableBackupPath.toString());
+      restoreTool.fullRestoreTable(svc, tableBackupPath, sTable, tTable, truncateIfExists,
+        lastIncrBackupId);
+    } else { // incremental Backup
+      throw new IOException("Unexpected backup type " + image.getType());
+    }
+
+    if (images.length == 1) {
+      // full backup restore done
+      return;
+    }
+
+    List<Path> dirList = new ArrayList<Path>();
+    // add full backup path
+    // full backup path comes first
+    for (int i = 1; i < images.length; i++) {
+      BackupImage im = images[i];
+      String logBackupDir = HBackupFileSystem.getLogBackupDir(im.getRootDir(), im.getBackupId());
+      dirList.add(new Path(logBackupDir));
+    }
+
+    String dirs = StringUtils.join(dirList, ",");
+    LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from log dirs: " + dirs);
+    Path[] paths = new Path[dirList.size()];
+    dirList.toArray(paths);
+    restoreTool.incrementalRestoreTable(svc, tableBackupPath, paths, new TableName[] { sTable },
+      new TableName[] { tTable }, lastIncrBackupId);
+    LOG.info(sTable + " has been successfully restored to " + tTable);
+
+  }
+
+  /**
+   * Restore operation. Stage 2: resolved Backup Image dependency
+   * @param svc: master services
+   * @param backupManifestMap : tableName,  Manifest
+   * @param sTableArray The array of tables to be restored
+   * @param tTableArray The array of mapping tables to restore to
+   * @return set of BackupImages restored
+   * @throws IOException exception
+   */
+  private void restore(MasterServices svc, HashMap<TableName, BackupManifest> backupManifestMap,
+      TableName[] sTableArray, TableName[] tTableArray, boolean isOverwrite) throws IOException {
+    TreeSet<BackupImage> restoreImageSet = new TreeSet<BackupImage>();
+    boolean truncateIfExists = isOverwrite;
+    try {
+      for (int i = 0; i < sTableArray.length; i++) {
+        TableName table = sTableArray[i];
+        BackupManifest manifest = backupManifestMap.get(table);
+        // Get the image list of this backup for restore in time order from old
+        // to new.
+        List<BackupImage> list = new ArrayList<BackupImage>();
+        list.add(manifest.getBackupImage());
+        TreeSet<BackupImage> set = new TreeSet<BackupImage>(list);
+        List<BackupImage> depList = manifest.getDependentListByTable(table);
+        set.addAll(depList);
+        BackupImage[] arr = new BackupImage[set.size()];
+        set.toArray(arr);
+        restoreImages(svc, arr, table, tTableArray[i], truncateIfExists);
+        restoreImageSet.addAll(list);
+        if (restoreImageSet != null && !restoreImageSet.isEmpty()) {
+          LOG.info("Restore includes the following image(s):");
+          for (BackupImage image : restoreImageSet) {
+            LOG.info("Backup: "
+                + image.getBackupId()
+                + " "
+                + HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(),
+                  table));
+          }
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("Failed", e);
+      throw new IOException(e);
+    }
+    LOG.debug("restoreStage finished");
+  }
+
+  @Override
+  protected Flow executeFromState(final MasterProcedureEnv env, final RestoreTablesState state)
+      throws InterruptedException {
+    if (conf == null) {
+      conf = env.getMasterConfiguration();
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(this + " execute state=" + state);
+    }
+    TableName[] tTableArray = tTableList.toArray(new TableName[tTableList.size()]);
+    try {
+      switch (state) {
+        case VALIDATION:
+
+          // check the target tables
+          checkTargetTables(env.getMasterServices().getConnection(),
+              env.getMasterServices().getTableStateManager(), tTableArray, isOverwrite);
+
+          setNextState(RestoreTablesState.RESTORE_IMAGES);
+          break;
+        case RESTORE_IMAGES:
+          TableName[] sTableArray = sTableList.toArray(new TableName[sTableList.size()]);
+          HashMap<TableName, BackupManifest> backupManifestMap = new HashMap<>();
+          // check and load backup image manifest for the tables
+          Path rootPath = new Path(targetRootDir);
+          HBackupFileSystem.checkImageManifestExist(backupManifestMap, sTableArray, conf, rootPath,
+            backupId);
+          restore(env.getMasterServices(), backupManifestMap, sTableArray, tTableArray, isOverwrite);
+          return Flow.NO_MORE_STATE;
+        default:
+          throw new UnsupportedOperationException("unhandled state=" + state);
+      }
+    } catch (IOException e) {
+      setFailure("restore-table", e);
+    }
+    return Flow.HAS_MORE_STATE;
+  }
+
+  @Override
+  protected void rollbackState(final MasterProcedureEnv env, final RestoreTablesState state)
+      throws IOException {
+  }
+
+  @Override
+  protected RestoreTablesState getState(final int stateId) {
+    return RestoreTablesState.valueOf(stateId);
+  }
+
+  @Override
+  protected int getStateId(final RestoreTablesState state) {
+    return state.getNumber();
+  }
+
+  @Override
+  protected RestoreTablesState getInitialState() {
+    return RestoreTablesState.VALIDATION;
+  }
+
+  @Override
+  protected void setNextState(final RestoreTablesState state) {
+    if (aborted.get()) {
+      setAbortFailure("snapshot-table", "abort requested");
+    } else {
+      super.setNextState(state);
+    }
+  }
+
+  @Override
+  public boolean abort(final MasterProcedureEnv env) {
+    aborted.set(true);
+    return true;
+  }
+
+  @Override
+  public void toStringClassDetails(StringBuilder sb) {
+    sb.append(getClass().getSimpleName());
+    sb.append(" (targetRootDir=");
+    sb.append(targetRootDir);
+    sb.append(" isOverwrite= ");
+    sb.append(isOverwrite);
+    sb.append(" backupId= ");
+    sb.append(backupId);
+    sb.append(")");
+  }
+
+  MasterProtos.RestoreTablesRequest toRestoreTables() {
+    MasterProtos.RestoreTablesRequest.Builder bldr = MasterProtos.RestoreTablesRequest.newBuilder();
+    bldr.setOverwrite(isOverwrite).setBackupId(backupId);
+    bldr.setBackupRootDir(targetRootDir);
+    for (TableName table : sTableList) {
+      bldr.addTables(ProtobufUtil.toProtoTableName(table));
+    }
+    for (TableName table : tTableList) {
+      bldr.addTargetTables(ProtobufUtil.toProtoTableName(table));
+    }
+    return bldr.build();
+  }
+
+  @Override
+  public void serializeStateData(final OutputStream stream) throws IOException {
+    super.serializeStateData(stream);
+
+    MasterProtos.RestoreTablesRequest restoreTables = toRestoreTables();
+    restoreTables.writeDelimitedTo(stream);
+  }
+
+  @Override
+  public void deserializeStateData(final InputStream stream) throws IOException {
+    super.deserializeStateData(stream);
+
+    MasterProtos.RestoreTablesRequest proto =
+        MasterProtos.RestoreTablesRequest.parseDelimitedFrom(stream);
+    backupId = proto.getBackupId();
+    targetRootDir = proto.getBackupRootDir();
+    isOverwrite = proto.getOverwrite();
+    sTableList = new ArrayList<>(proto.getTablesList().size());
+    for (HBaseProtos.TableName table : proto.getTablesList()) {
+      sTableList.add(ProtobufUtil.toTableName(table));
+    }
+    tTableList = new ArrayList<>(proto.getTargetTablesList().size());
+    for (HBaseProtos.TableName table : proto.getTargetTablesList()) {
+      tTableList.add(ProtobufUtil.toTableName(table));
+    }
+  }
+
+  @Override
+  public TableName getTableName() {
+    return TableName.BACKUP_TABLE_NAME;
+  }
+
+  @Override
+  public TableOperationType getTableOperationType() {
+    return TableOperationType.RESTORE;
+  }
+
+  @Override
+  protected boolean acquireLock(final MasterProcedureEnv env) {
+    if (env.waitInitialized(this)) {
+      return false;
+    }
+    return env.getProcedureQueue().tryAcquireTableExclusiveLock(this, TableName.BACKUP_TABLE_NAME);
+  }
+
+  @Override
+  protected void releaseLock(final MasterProcedureEnv env) {
+    env.getProcedureQueue().releaseTableExclusiveLock(this, TableName.BACKUP_TABLE_NAME);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/6d1e7079/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java
index 37bfcc2..3da7860 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java
@@ -41,10 +41,9 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.backup.BackupInfo;
 import org.apache.hadoop.hbase.backup.BackupRestoreServerFactory;
 import org.apache.hadoop.hbase.backup.HBackupFileSystem;
-import org.apache.hadoop.hbase.backup.IncrementalRestoreService;
+import org.apache.hadoop.hbase.backup.RestoreService;
 import org.apache.hadoop.hbase.backup.RestoreRequest;
 import org.apache.hadoop.hbase.backup.impl.BackupManifest;
 import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
@@ -115,6 +114,7 @@ public class RestoreServerUtil {
    */
   Path getTableArchivePath(TableName tableName)
       throws IOException {
+
     Path baseDir = new Path(HBackupFileSystem.getTableBackupPath(tableName, backupRootPath, 
       backupId), HConstants.HFILE_ARCHIVE_DIRECTORY);
     Path dataDir = new Path(baseDir, HConstants.BASE_NAMESPACE_DIR);
@@ -148,8 +148,33 @@ public class RestoreServerUtil {
     return regionDirList;
   }
 
+  /**
+   * Gets region list
+   * @param tableName table name
+   * @param backupId backup id
+   * @return RegionList region list
+   * @throws FileNotFoundException exception
+   * @throws IOException exception
+   */
+  ArrayList<Path> getRegionList(TableName tableName, String backupId) throws FileNotFoundException,
+      IOException {
+    Path tableArchivePath =
+        new Path(BackupClientUtil.getTableBackupDir(backupRootPath.toString(), 
+          backupId, tableName));
+
+    ArrayList<Path> regionDirList = new ArrayList<Path>();
+    FileStatus[] children = fs.listStatus(tableArchivePath);
+    for (FileStatus childStatus : children) {
+      // here child refer to each region(Name)
+      Path child = childStatus.getPath();
+      regionDirList.add(child);
+    }
+    return regionDirList;
+  }
+  
   static void modifyTableSync(MasterServices svc, HTableDescriptor desc) throws IOException {
     svc.modifyTable(desc.getTableName(), desc, HConstants.NO_NONCE, HConstants.NO_NONCE);
+    @SuppressWarnings("serial")
     Pair<Integer, Integer> status = new Pair<Integer, Integer>() {{
       setFirst(0);
       setSecond(0);
@@ -234,16 +259,16 @@ public class RestoreServerUtil {
         LOG.info("Changed " + newTableDescriptor.getTableName() + " to: " + newTableDescriptor);
       }
     }
-    IncrementalRestoreService restoreService =
-        BackupRestoreServerFactory.getIncrementalRestoreService(conf);
+    RestoreService restoreService =
+        BackupRestoreServerFactory.getRestoreService(conf);
 
-    restoreService.run(logDirs, tableNames, newTableNames);
+    restoreService.run(logDirs, tableNames, newTableNames, false);
   }
 
   public void fullRestoreTable(MasterServices svc, Path tableBackupPath, TableName tableName,
-      TableName newTableName, boolean converted, boolean truncateIfExists, String lastIncrBackupId)
+      TableName newTableName, boolean truncateIfExists, String lastIncrBackupId)
           throws IOException {
-    restoreTableAndCreate(svc, tableName, newTableName, tableBackupPath, converted, truncateIfExists,
+    restoreTableAndCreate(svc, tableName, newTableName, tableBackupPath, truncateIfExists,
         lastIncrBackupId);
   }
 
@@ -355,20 +380,19 @@ public class RestoreServerUtil {
     if (lastIncrBackupId != null) {
       String target = BackupClientUtil.getTableBackupDir(backupRootPath.toString(),
           lastIncrBackupId, tableName);
-      // Path target = new Path(info.getBackupStatus(tableName).getTargetDir());
       return FSTableDescriptors.getTableDescriptorFromFs(fileSys,
           new Path(target)).getHTableDescriptor();
     }
     return null;
   }
 
-  private void restoreTableAndCreate(MasterServices svc, TableName tableName, TableName newTableName,
-      Path tableBackupPath, boolean converted, boolean truncateIfExists, String lastIncrBackupId)
-          throws IOException {
+  private void restoreTableAndCreate(MasterServices svc, TableName tableName,
+      TableName newTableName, Path tableBackupPath, boolean truncateIfExists,
+      String lastIncrBackupId) throws IOException {
     if (newTableName == null || newTableName.equals("")) {
       newTableName = tableName;
     }
-
+    boolean fullBackupRestoreOnly = lastIncrBackupId == null;
     FileSystem fileSys = tableBackupPath.getFileSystem(this.conf);
 
     HTableDescriptor tableDescriptor = getTableDescriptor(fileSys, tableName, lastIncrBackupId);
@@ -384,7 +408,7 @@ public class RestoreServerUtil {
         if (snapshotMap.get(tableName) != null) {
           SnapshotDescription desc =
               SnapshotDescriptionUtils.readSnapshotInfo(fileSys, tableSnapshotPath);
-          SnapshotManifest manifest = SnapshotManifest.open(conf,fileSys,tableSnapshotPath,desc);
+          SnapshotManifest manifest = SnapshotManifest.open(conf, fileSys, tableSnapshotPath, desc);
           tableDescriptor = manifest.getTableDescriptor();
           LOG.debug("obtained descriptor from " + manifest);
         } else {
@@ -395,9 +419,9 @@ public class RestoreServerUtil {
         if (tableDescriptor == null) {
           LOG.debug("Found no table descriptor in the snapshot dir, previous schema was lost");
         }
-      } else if (converted) {
-        // first check if this is a converted backup image
-        LOG.error("convert will be supported in a future jira");
+      } else {
+        throw new IOException("Table snapshot directory: " + tableSnapshotPath
+            + " does not exist.");
       }
     }
 
@@ -405,13 +429,13 @@ public class RestoreServerUtil {
     if (tableArchivePath == null) {
       if (tableDescriptor != null) {
         // find table descriptor but no archive dir => the table is empty, create table and exit
-        if(LOG.isDebugEnabled()) {
+        if (LOG.isDebugEnabled()) {
           LOG.debug("find table descriptor but no archive dir for table " + tableName
               + ", will only create table");
         }
         tableDescriptor.setName(newTableName);
-        checkAndCreateTable(svc, tableBackupPath, tableName, newTableName, null, 
-            tableDescriptor, truncateIfExists);
+        checkAndCreateTable(svc, tableBackupPath, tableName, newTableName, null, tableDescriptor,
+          truncateIfExists);
         return;
       } else {
         throw new IllegalStateException("Cannot restore hbase table because directory '"
@@ -426,50 +450,61 @@ public class RestoreServerUtil {
       tableDescriptor.setName(newTableName);
     }
 
-    if (!converted) {
-      // record all region dirs:
-      // load all files in dir
-      try {
-        ArrayList<Path> regionPathList = getRegionList(tableName);
-
-        // should only try to create the table with all region informations, so we could pre-split
-        // the regions in fine grain
-        checkAndCreateTable(svc, tableBackupPath, tableName, newTableName, regionPathList,
-            tableDescriptor, truncateIfExists);
-        if (tableArchivePath != null) {
-          // start real restore through bulkload
-          // if the backup target is on local cluster, special action needed
-          Path tempTableArchivePath = checkLocalAndBackup(tableArchivePath);
-          if (tempTableArchivePath.equals(tableArchivePath)) {
-            if(LOG.isDebugEnabled()) {
-              LOG.debug("TableArchivePath for bulkload using existPath: " + tableArchivePath);
-            }
-          } else {
-            regionPathList = getRegionList(tempTableArchivePath); // point to the tempDir
-            if(LOG.isDebugEnabled()) {
-              LOG.debug("TableArchivePath for bulkload using tempPath: " + tempTableArchivePath);
-            }
-          }
+    // record all region dirs:
+    // load all files in dir
+    try {
+      // Region splits for last incremental backup id
+      // We use it to create table with pre-splits
+      ArrayList<Path> regionPathList =
+          fullBackupRestoreOnly ? getRegionList(tableName) : getRegionList(tableName,
+            lastIncrBackupId);
+
+      // should only try to create the table with all region informations, so we could pre-split
+      // the regions in fine grain
+      checkAndCreateTable(svc, tableBackupPath, tableName, newTableName, regionPathList,
+        tableDescriptor, truncateIfExists);
+
+      // Now get region splits from full backup
+      regionPathList = getRegionList(tableName);
+
+      // start real restore through bulkload
+      // if the backup target is on local cluster, special action needed
+      Path tempTableArchivePath = checkLocalAndBackup(tableArchivePath);
+      if (tempTableArchivePath.equals(tableArchivePath)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("TableArchivePath for bulkload using existPath: " + tableArchivePath);
+        }
+      } else {
+        regionPathList = getRegionList(tempTableArchivePath); // point to the tempDir
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("TableArchivePath for bulkload using tempPath: " + tempTableArchivePath);
+        }
+      }
 
-          LoadIncrementalHFiles loader = createLoader(tempTableArchivePath, false);
-          for (Path regionPath : regionPathList) {
-            String regionName = regionPath.toString();
-            if(LOG.isDebugEnabled()) {
-              LOG.debug("Restoring HFiles from directory " + regionName);
-            }
-            String[] args = { regionName, newTableName.getNameAsString() };
-            loader.run(args);
+      if (fullBackupRestoreOnly) {
+        LoadIncrementalHFiles loader = createLoader(tempTableArchivePath, false);
+        for (Path regionPath : regionPathList) {
+          String regionName = regionPath.toString();
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Restoring HFiles from directory " + regionName);
           }
+          String[] args = { regionName, newTableName.getNameAsString() };
+          loader.run(args);
         }
-        // we do not recovered edits
-      } catch (Exception e) {
-        throw new IllegalStateException("Cannot restore hbase table", e);
+      } else {
+        // Run restore service
+        Path[] dirs = new Path[regionPathList.size()];
+        regionPathList.toArray(dirs);
+        RestoreService restoreService =
+            BackupRestoreServerFactory.getRestoreService(conf);
+
+        restoreService.run(dirs, new TableName[] { tableName }, new TableName[] { newTableName },
+          true);
       }
-    } else {
-      LOG.debug("convert will be supported in a future jira");
+    } catch (Exception e) {
+      throw new IllegalStateException("Cannot restore hbase table", e);
     }
   }
-
   /**
    * Gets region list
    * @param tableArchivePath table archive path

http://git-wip-us.apache.org/repos/asf/hbase/blob/6d1e7079/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat2.java
new file mode 100644
index 0000000..dfcd7be
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat2.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple input format for HFiles.
+ * This code was borrowed from Apache Crunch project.
+ * Updated to the recent version of HBase.
+ */
+public class HFileInputFormat2 extends FileInputFormat<NullWritable, Cell> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(HFileInputFormat2.class);
+
+  /**
+   * File filter that removes all "hidden" files. This might be something worth removing from
+   * a more general purpose utility; it accounts for the presence of metadata files created
+   * in the way we're doing exports.
+   */
+  static final PathFilter HIDDEN_FILE_FILTER = new PathFilter() {
+    public boolean accept(Path p) {
+      String name = p.getName();
+      return !name.startsWith("_") && !name.startsWith(".");
+    }
+  };
+
+  /**
+   * Record reader for HFiles.
+   */
+  private static class HFileRecordReader extends RecordReader<NullWritable, Cell> {
+
+    private Reader in;
+    protected Configuration conf;
+    private HFileScanner scanner;
+
+    /**
+     * A private cache of the key value so it doesn't need to be loaded twice from the scanner.
+     */
+    private Cell value = null;
+    private long count;
+    private boolean seeked = false;
+
+    @Override
+    public void initialize(InputSplit split, TaskAttemptContext context) 
+        throws IOException, InterruptedException {
+      FileSplit fileSplit = (FileSplit) split;
+      conf = context.getConfiguration();
+      Path path = fileSplit.getPath();
+      FileSystem fs = path.getFileSystem(conf);
+      LOG.info("Initialize HFileRecordReader for {}", path);
+      this.in = HFile.createReader(fs, path, new CacheConfig(conf), conf);
+
+      // The file info must be loaded before the scanner can be used.
+      // This seems like a bug in HBase, but it's easily worked around.
+      this.in.loadFileInfo();
+      this.scanner = in.getScanner(false, false);
+
+    }
+
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      boolean hasNext;
+      if (!seeked) {
+        LOG.info("Seeking to start");
+        hasNext = scanner.seekTo();
+        seeked = true;
+      } else {
+        hasNext = scanner.next();
+      }
+      if (!hasNext) {
+        return false;
+      }
+      value = scanner.getCell();
+      count++;
+      return true;
+    }
+
+    @Override
+    public NullWritable getCurrentKey() throws IOException, InterruptedException {
+      return NullWritable.get();
+    }
+
+    @Override
+    public Cell getCurrentValue() throws IOException, InterruptedException {
+      return value;
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+      // This would be inaccurate if KVs are not uniformly-sized or we have performed a seek to
+      // the start row, but better than nothing anyway.
+      return 1.0f * count / in.getEntries();
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (in != null) {
+        in.close();
+        in = null;
+      }
+    }
+  }
+
+  @Override
+  protected List<FileStatus> listStatus(JobContext job) throws IOException {
+    List<FileStatus> result = new ArrayList<FileStatus>();
+
+    // Explode out directories that match the original FileInputFormat filters 
+    // since HFiles are written to directories where the
+    // directory name is the column name
+    for (FileStatus status : super.listStatus(job)) {
+      if (status.isDirectory()) {
+        FileSystem fs = status.getPath().getFileSystem(job.getConfiguration());
+        for (FileStatus match : fs.listStatus(status.getPath(), HIDDEN_FILE_FILTER)) {
+          result.add(match);
+        }
+      } else {
+        result.add(status);
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public RecordReader<NullWritable, Cell> createRecordReader(InputSplit split, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    return new HFileRecordReader();
+  }
+
+  @Override
+  protected boolean isSplitable(JobContext context, Path filename) {
+    // This file isn't splittable.
+    return false;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/6d1e7079/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index c54eee0..8c794c1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -83,9 +83,9 @@ import org.apache.hadoop.hbase.backup.impl.BackupManager;
 import org.apache.hadoop.hbase.backup.impl.BackupManifest;
 import org.apache.hadoop.hbase.backup.impl.BackupRestoreConstants;
 import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
-import org.apache.hadoop.hbase.backup.impl.RestoreTablesProcedure;
 import org.apache.hadoop.hbase.backup.master.FullTableBackupProcedure;
 import org.apache.hadoop.hbase.backup.master.IncrementalTableBackupProcedure;
+import org.apache.hadoop.hbase.backup.master.RestoreTablesProcedure;
 import org.apache.hadoop.hbase.backup.util.RestoreServerUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Admin;

http://git-wip-us.apache.org/repos/asf/hbase/blob/6d1e7079/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
index ff5e739..ec53a64 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
@@ -284,7 +284,7 @@ public class TestBackupBase {
     FileSystem fs = FileSystem.get(conf1);
     RemoteIterator<LocatedFileStatus> it = fs.listFiles( new Path(BACKUP_ROOT_DIR), true);
     while(it.hasNext()){
-      LOG.debug("DDEBUG: "+it.next().getPath());
+      LOG.debug(it.next().getPath());
     }
 
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/6d1e7079/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java
index 2251c74..fe00ac5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.backup;
 
 import static org.junit.Assert.assertTrue;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -29,6 +28,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.backup.util.RestoreServerUtil;
 import org.apache.hadoop.hbase.client.BackupAdmin;
@@ -37,14 +37,16 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.hamcrest.CoreMatchers;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.junit.experimental.categories.Category;
 
 import com.google.common.collect.Lists;
 
@@ -66,6 +68,8 @@ public class TestIncrementalBackup extends TestBackupBase {
   //implement all test cases in 1 test since incremental backup/restore has dependencies
   @Test
   public void TestIncBackupRestore() throws Exception {
+    
+    int ADD_ROWS = 99;
     // #1 - create full backup for all tables
     LOG.info("create full backup image for all tables");
 
@@ -88,13 +92,13 @@ public class TestIncrementalBackup extends TestBackupBase {
     assertTrue(checkSucceeded(backupIdFull));
 
     // #2 - insert some data to table
-    HTable t1 = insertIntoTable(conn, table1, famName, 1, NB_ROWS_IN_BATCH);
-    LOG.debug("writing " + NB_ROWS_IN_BATCH + " rows to " + table1);
+    HTable t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
+    LOG.debug("writing " + ADD_ROWS + " rows to " + table1);
 
     Assert.assertThat(TEST_UTIL.countRows(t1), CoreMatchers.equalTo(
-        NB_ROWS_IN_BATCH * 2 + NB_ROWS_FAM3));
+        NB_ROWS_IN_BATCH + ADD_ROWS + NB_ROWS_FAM3));
     t1.close();
-    LOG.debug("written " + NB_ROWS_IN_BATCH + " rows to " + table1);
+    LOG.debug("written " + ADD_ROWS + " rows to " + table1);
 
     HTable t2 =  (HTable) conn.getTable(table2);
     Put p2;
@@ -107,7 +111,23 @@ public class TestIncrementalBackup extends TestBackupBase {
     Assert.assertThat(TEST_UTIL.countRows(t2), CoreMatchers.equalTo(NB_ROWS_IN_BATCH + 5));
     t2.close();
     LOG.debug("written " + 5 + " rows to " + table2);
+    // split table1 
+    MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+    List<HRegion> regions = cluster.getRegions(table1);
+
+    byte[] name = regions.get(0).getRegionInfo().getRegionName();    
+    long startSplitTime = EnvironmentEdgeManager.currentTime();
+    admin.splitRegion(name);
+
+    while (!admin.isTableAvailable(table1)) {
+      Thread.sleep(100);
+    }
+    
+    long endSplitTime = EnvironmentEdgeManager.currentTime();
 
+    // split finished 
+    LOG.debug("split finished in ="+ (endSplitTime - startSplitTime));
+    
     // #3 - incremental backup for multiple tables
     tables = Lists.newArrayList(table1, table2);
     request = new BackupRequest();
@@ -176,7 +196,7 @@ public class TestIncrementalBackup extends TestBackupBase {
     LOG.debug("After incremental restore: " + hTable.getTableDescriptor());
     LOG.debug("f1 has " + TEST_UTIL.countRows(hTable, famName) + " rows");
     Assert.assertThat(TEST_UTIL.countRows(hTable, famName),
-        CoreMatchers.equalTo(NB_ROWS_IN_BATCH * 2));
+        CoreMatchers.equalTo(NB_ROWS_IN_BATCH + ADD_ROWS));
     LOG.debug("f2 has " + TEST_UTIL.countRows(hTable, fam2Name) + " rows");
     Assert.assertThat(TEST_UTIL.countRows(hTable, fam2Name), CoreMatchers.equalTo(NB_ROWS_FAM2));
     hTable.close();