You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by el...@apache.org on 2017/03/20 22:49:37 UTC

[04/54] [abbrv] hbase git commit: HBASE-14123 HBase Backup/Restore Phase 2 (Vladimir Rodionov)

http://git-wip-us.apache.org/repos/asf/hbase/blob/75d0f49d/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java
new file mode 100644
index 0000000..47e428c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.master;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
+import org.apache.hadoop.hbase.backup.impl.BackupManager;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.MetricsMaster;
+import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
+import org.apache.hadoop.hbase.procedure.Procedure;
+import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
+import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
+import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Master procedure manager for coordinated cluster-wide WAL roll operation, which is run during
+ * backup operation, see {@link MasterProcedureManager} and and {@link RegionServerProcedureManager}
+ */
+@InterfaceAudience.Private
+public class LogRollMasterProcedureManager extends MasterProcedureManager {
+
+  public static final String ROLLLOG_PROCEDURE_SIGNATURE = "rolllog-proc";
+  public static final String ROLLLOG_PROCEDURE_NAME = "rolllog";
+  private static final Log LOG = LogFactory.getLog(LogRollMasterProcedureManager.class);
+
+  private MasterServices master;
+  private ProcedureCoordinator coordinator;
+  private boolean done;
+
+  @Override
+  public void stop(String why) {
+    LOG.info("stop: " + why);
+  }
+
+  @Override
+  public boolean isStopped() {
+    return false;
+  }
+
+  @Override
+  public void initialize(MasterServices master, MetricsMaster metricsMaster)
+      throws KeeperException, IOException, UnsupportedOperationException {
+    this.master = master;
+    this.done = false;
+
+    // setup the default procedure coordinator
+    String name = master.getServerName().toString();
+    ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, 1);
+    BaseCoordinatedStateManager coordManager =
+        (BaseCoordinatedStateManager) CoordinatedStateManagerFactory
+            .getCoordinatedStateManager(master.getConfiguration());
+    coordManager.initialize(master);
+
+    ProcedureCoordinatorRpcs comms =
+        coordManager.getProcedureCoordinatorRpcs(getProcedureSignature(), name);
+
+    this.coordinator = new ProcedureCoordinator(comms, tpool);
+  }
+
+  @Override
+  public String getProcedureSignature() {
+    return ROLLLOG_PROCEDURE_SIGNATURE;
+  }
+
+  @Override
+  public void execProcedure(ProcedureDescription desc) throws IOException {
+    if (!isBackupEnabled()) {
+      LOG.warn("Backup is not enabled. Check your " + BackupRestoreConstants.BACKUP_ENABLE_KEY
+          + " setting");
+      return;
+    }
+    this.done = false;
+    // start the process on the RS
+    ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(desc.getInstance());
+    List<ServerName> serverNames = master.getServerManager().getOnlineServersList();
+    List<String> servers = new ArrayList<String>();
+    for (ServerName sn : serverNames) {
+      servers.add(sn.toString());
+    }
+
+    List<NameStringPair> conf = desc.getConfigurationList();
+    byte[] data = new byte[0];
+    if (conf.size() > 0) {
+      // Get backup root path
+      data = conf.get(0).getValue().getBytes();
+    }
+    Procedure proc = coordinator.startProcedure(monitor, desc.getInstance(), data, servers);
+    if (proc == null) {
+      String msg = "Failed to submit distributed procedure for '" + desc.getInstance() + "'";
+      LOG.error(msg);
+      throw new IOException(msg);
+    }
+
+    try {
+      // wait for the procedure to complete. A timer thread is kicked off that should cancel this
+      // if it takes too long.
+      proc.waitForCompleted();
+      LOG.info("Done waiting - exec procedure for " + desc.getInstance());
+      LOG.info("Distributed roll log procedure is successful!");
+      this.done = true;
+    } catch (InterruptedException e) {
+      ForeignException ee =
+          new ForeignException("Interrupted while waiting for roll log procdure to finish", e);
+      monitor.receive(ee);
+      Thread.currentThread().interrupt();
+    } catch (ForeignException e) {
+      ForeignException ee =
+          new ForeignException("Exception while waiting for roll log procdure to finish", e);
+      monitor.receive(ee);
+    }
+    monitor.rethrowException();
+  }
+
+  private boolean isBackupEnabled() {
+    return BackupManager.isBackupEnabled(master.getConfiguration());
+  }
+
+  @Override
+  public boolean isProcedureDone(ProcedureDescription desc) throws IOException {
+    return done;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/75d0f49d/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java
new file mode 100644
index 0000000..8fc644c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.regionserver;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.hadoop.hbase.procedure.ProcedureMember;
+import org.apache.hadoop.hbase.procedure.Subprocedure;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
+import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.wal.WAL;
+
+/**
+ * This backup sub-procedure implementation forces a WAL rolling on a RS.
+ */
+@InterfaceAudience.Private
+public class LogRollBackupSubprocedure extends Subprocedure {
+  private static final Log LOG = LogFactory.getLog(LogRollBackupSubprocedure.class);
+
+  private final RegionServerServices rss;
+  private final LogRollBackupSubprocedurePool taskManager;
+  private FSHLog hlog;
+  private String backupRoot;
+
+  public LogRollBackupSubprocedure(RegionServerServices rss, ProcedureMember member,
+      ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout,
+      LogRollBackupSubprocedurePool taskManager, byte[] data) {
+
+    super(member, LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, errorListener,
+        wakeFrequency, timeout);
+    LOG.info("Constructing a LogRollBackupSubprocedure.");
+    this.rss = rss;
+    this.taskManager = taskManager;
+    if (data != null) {
+      backupRoot = new String(data);
+    }
+  }
+
+  /**
+   * Callable task. TODO. We don't need a thread pool to execute roll log. This can be simplified
+   * with no use of subprocedurepool.
+   */
+  class RSRollLogTask implements Callable<Void> {
+    RSRollLogTask() {
+    }
+
+    @Override
+    public Void call() throws Exception {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("++ DRPC started: " + rss.getServerName());
+      }
+      hlog = (FSHLog) rss.getWAL(null);
+      long filenum = hlog.getFilenum();
+      List<WAL> wals = rss.getWALs();
+      long highest = -1;
+      for (WAL wal : wals) {
+        if (wal == null) continue;
+        if (((AbstractFSWAL) wal).getFilenum() > highest) {
+          highest = ((AbstractFSWAL) wal).getFilenum();
+        }
+      }
+
+      LOG.info("Trying to roll log in backup subprocedure, current log number: " + filenum
+          + " highest: " + highest + " on " + rss.getServerName());
+      ((HRegionServer) rss).getWalRoller().requestRollAll();
+      long start = EnvironmentEdgeManager.currentTime();
+      while (!((HRegionServer) rss).getWalRoller().walRollFinished()) {
+        Thread.sleep(20);
+      }
+      LOG.debug("log roll took " + (EnvironmentEdgeManager.currentTime() - start));
+      LOG.info("After roll log in backup subprocedure, current log number: " + hlog.getFilenum()
+          + " on " + rss.getServerName());
+
+      Connection connection = rss.getConnection();
+      try (final BackupSystemTable table = new BackupSystemTable(connection)) {
+        // sanity check, good for testing
+        HashMap<String, Long> serverTimestampMap =
+            table.readRegionServerLastLogRollResult(backupRoot);
+        String host = rss.getServerName().getHostname();
+        int port = rss.getServerName().getPort();
+        String server = host + ":" + port;
+        Long sts = serverTimestampMap.get(host);
+        if (sts != null && sts > highest) {
+          LOG.warn("Won't update server's last roll log result: current=" + sts + " new=" + highest);
+          return null;
+        }
+        // write the log number to backup system table.
+        table.writeRegionServerLastLogRollResult(server, highest, backupRoot);
+        return null;
+      } catch (Exception e) {
+        LOG.error(e);
+        throw e;
+      }
+    }
+  }
+
+  private void rolllog() throws ForeignException {
+    monitor.rethrowException();
+
+    taskManager.submitTask(new RSRollLogTask());
+    monitor.rethrowException();
+
+    // wait for everything to complete.
+    taskManager.waitForOutstandingTasks();
+    monitor.rethrowException();
+
+  }
+
+  @Override
+  public void acquireBarrier() throws ForeignException {
+    // do nothing, executing in inside barrier step.
+  }
+
+  /**
+   * do a log roll.
+   * @return some bytes
+   */
+  @Override
+  public byte[] insideBarrier() throws ForeignException {
+    rolllog();
+    return null;
+  }
+
+  /**
+   * Cancel threads if they haven't finished.
+   */
+  @Override
+  public void cleanup(Exception e) {
+    taskManager.abort("Aborting log roll subprocedure tasks for backup due to error", e);
+  }
+
+  /**
+   * Hooray!
+   */
+  public void releaseBarrier() {
+    // NO OP
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/75d0f49d/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java
new file mode 100644
index 0000000..65a1fa3
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.regionserver;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.DaemonThreadFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+
+/**
+ * Handle running each of the individual tasks for completing a backup procedure on a region
+ * server.
+ */
+@InterfaceAudience.Private
+public class LogRollBackupSubprocedurePool implements Closeable, Abortable {
+  private static final Log LOG = LogFactory.getLog(LogRollBackupSubprocedurePool.class);
+
+  /** Maximum number of concurrent snapshot region tasks that can run concurrently */
+  private static final String CONCURENT_BACKUP_TASKS_KEY = "hbase.backup.region.concurrentTasks";
+  private static final int DEFAULT_CONCURRENT_BACKUP_TASKS = 3;
+
+  private final ExecutorCompletionService<Void> taskPool;
+  private final ThreadPoolExecutor executor;
+  private volatile boolean aborted;
+  private final List<Future<Void>> futures = new ArrayList<Future<Void>>();
+  private final String name;
+
+  public LogRollBackupSubprocedurePool(String name, Configuration conf) {
+    // configure the executor service
+    long keepAlive =
+        conf.getLong(LogRollRegionServerProcedureManager.BACKUP_TIMEOUT_MILLIS_KEY,
+          LogRollRegionServerProcedureManager.BACKUP_TIMEOUT_MILLIS_DEFAULT);
+    int threads = conf.getInt(CONCURENT_BACKUP_TASKS_KEY, DEFAULT_CONCURRENT_BACKUP_TASKS);
+    this.name = name;
+    executor =
+        new ThreadPoolExecutor(1, threads, keepAlive, TimeUnit.SECONDS,
+            new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory("rs(" + name
+                + ")-backup-pool"));
+    taskPool = new ExecutorCompletionService<Void>(executor);
+  }
+
+  /**
+   * Submit a task to the pool.
+   */
+  public void submitTask(final Callable<Void> task) {
+    Future<Void> f = this.taskPool.submit(task);
+    futures.add(f);
+  }
+
+  /**
+   * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}
+   * @return <tt>true</tt> on success, <tt>false</tt> otherwise
+   * @throws ForeignException exception
+   */
+  public boolean waitForOutstandingTasks() throws ForeignException {
+    LOG.debug("Waiting for backup procedure to finish.");
+
+    try {
+      for (Future<Void> f : futures) {
+        f.get();
+      }
+      return true;
+    } catch (InterruptedException e) {
+      if (aborted) {
+        throw new ForeignException("Interrupted and found to be aborted while waiting for tasks!",
+            e);
+      }
+      Thread.currentThread().interrupt();
+    } catch (ExecutionException e) {
+      if (e.getCause() instanceof ForeignException) {
+        throw (ForeignException) e.getCause();
+      }
+      throw new ForeignException(name, e.getCause());
+    } finally {
+      // close off remaining tasks
+      for (Future<Void> f : futures) {
+        if (!f.isDone()) {
+          f.cancel(true);
+        }
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Attempt to cleanly shutdown any running tasks - allows currently running tasks to cleanly
+   * finish
+   */
+  @Override
+  public void close() {
+    executor.shutdown();
+  }
+
+  @Override
+  public void abort(String why, Throwable e) {
+    if (this.aborted) {
+      return;
+    }
+
+    this.aborted = true;
+    LOG.warn("Aborting because: " + why, e);
+    this.executor.shutdownNow();
+  }
+
+  @Override
+  public boolean isAborted() {
+    return this.aborted;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/75d0f49d/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java
new file mode 100644
index 0000000..9d5a858
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.regionserver;
+
+import java.io.IOException;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
+import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
+import org.apache.hadoop.hbase.backup.impl.BackupManager;
+import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.hadoop.hbase.procedure.ProcedureMember;
+import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
+import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager;
+import org.apache.hadoop.hbase.procedure.Subprocedure;
+import org.apache.hadoop.hbase.procedure.SubprocedureFactory;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * This manager class handles the work dealing with distributed WAL roll request.
+ * <p>
+ * This provides the mechanism necessary to kick off a backup specific {@link Subprocedure} that is
+ * responsible by this region server. If any failures occur with the sub-procedure, the manager's
+ * procedure member notifies the procedure coordinator to abort all others.
+ * <p>
+ * On startup, requires {@link #start()} to be called.
+ * <p>
+ * On shutdown, requires org.apache.hadoop.hbase.procedure.ProcedureMember.close() to be called
+ */
+@InterfaceAudience.Private
+public class LogRollRegionServerProcedureManager extends RegionServerProcedureManager {
+
+  private static final Log LOG = LogFactory.getLog(LogRollRegionServerProcedureManager.class);
+
+  /** Conf key for number of request threads to start backup on region servers */
+  public static final String BACKUP_REQUEST_THREADS_KEY = "hbase.backup.region.pool.threads";
+  /** # of threads for backup work on the rs. */
+  public static final int BACKUP_REQUEST_THREADS_DEFAULT = 10;
+
+  public static final String BACKUP_TIMEOUT_MILLIS_KEY = "hbase.backup.timeout";
+  public static final long BACKUP_TIMEOUT_MILLIS_DEFAULT = 60000;
+
+  /** Conf key for millis between checks to see if backup work completed or if there are errors */
+  public static final String BACKUP_REQUEST_WAKE_MILLIS_KEY = "hbase.backup.region.wakefrequency";
+  /** Default amount of time to check for errors while regions finish backup work */
+  private static final long BACKUP_REQUEST_WAKE_MILLIS_DEFAULT = 500;
+
+  private RegionServerServices rss;
+  private ProcedureMemberRpcs memberRpcs;
+  private ProcedureMember member;
+  private boolean started = false;
+
+  /**
+   * Create a default backup procedure manager
+   */
+  public LogRollRegionServerProcedureManager() {
+  }
+
+  /**
+   * Start accepting backup procedure requests.
+   */
+  @Override
+  public void start() {
+    if (!BackupManager.isBackupEnabled(rss.getConfiguration())) {
+      LOG.warn("Backup is not enabled. Check your " + BackupRestoreConstants.BACKUP_ENABLE_KEY
+          + " setting");
+      return;
+    }
+    this.memberRpcs.start(rss.getServerName().toString(), member);
+    started = true;
+    LOG.info("Started region server backup manager.");
+  }
+
+  /**
+   * Close <tt>this</tt> and all running backup procedure tasks
+   * @param force forcefully stop all running tasks
+   * @throws IOException exception
+   */
+  @Override
+  public void stop(boolean force) throws IOException {
+    if (!started) {
+      return;
+    }
+    String mode = force ? "abruptly" : "gracefully";
+    LOG.info("Stopping RegionServerBackupManager " + mode + ".");
+
+    try {
+      this.member.close();
+    } finally {
+      this.memberRpcs.close();
+    }
+  }
+
+  /**
+   * If in a running state, creates the specified subprocedure for handling a backup procedure.
+   * @return Subprocedure to submit to the ProcedureMemeber.
+   */
+  public Subprocedure buildSubprocedure(byte[] data) {
+
+    // don't run a backup if the parent is stop(ping)
+    if (rss.isStopping() || rss.isStopped()) {
+      throw new IllegalStateException("Can't start backup procedure on RS: " + rss.getServerName()
+          + ", because stopping/stopped!");
+    }
+
+    LOG.info("Attempting to run a roll log procedure for backup.");
+    ForeignExceptionDispatcher errorDispatcher = new ForeignExceptionDispatcher();
+    Configuration conf = rss.getConfiguration();
+    long timeoutMillis = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY, BACKUP_TIMEOUT_MILLIS_DEFAULT);
+    long wakeMillis =
+        conf.getLong(BACKUP_REQUEST_WAKE_MILLIS_KEY, BACKUP_REQUEST_WAKE_MILLIS_DEFAULT);
+
+    LogRollBackupSubprocedurePool taskManager =
+        new LogRollBackupSubprocedurePool(rss.getServerName().toString(), conf);
+    return new LogRollBackupSubprocedure(rss, member, errorDispatcher, wakeMillis, timeoutMillis,
+        taskManager, data);
+
+  }
+
+  /**
+   * Build the actual backup procedure runner that will do all the 'hard' work
+   */
+  public class BackupSubprocedureBuilder implements SubprocedureFactory {
+
+    @Override
+    public Subprocedure buildSubprocedure(String name, byte[] data) {
+      return LogRollRegionServerProcedureManager.this.buildSubprocedure(data);
+    }
+  }
+
+  @Override
+  public void initialize(RegionServerServices rss) throws KeeperException {
+    this.rss = rss;
+    if (!BackupManager.isBackupEnabled(rss.getConfiguration())) {
+      LOG.warn("Backup is not enabled. Check your " + BackupRestoreConstants.BACKUP_ENABLE_KEY
+          + " setting");
+      return;
+    }
+    BaseCoordinatedStateManager coordManager =
+        (BaseCoordinatedStateManager) CoordinatedStateManagerFactory.
+          getCoordinatedStateManager(rss.getConfiguration());
+    coordManager.initialize(rss);
+    this.memberRpcs =
+        coordManager
+            .getProcedureMemberRpcs(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE);
+
+    // read in the backup handler configuration properties
+    Configuration conf = rss.getConfiguration();
+    long keepAlive = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY, BACKUP_TIMEOUT_MILLIS_DEFAULT);
+    int opThreads = conf.getInt(BACKUP_REQUEST_THREADS_KEY, BACKUP_REQUEST_THREADS_DEFAULT);
+    // create the actual cohort member
+    ThreadPoolExecutor pool =
+        ProcedureMember.defaultPool(rss.getServerName().toString(), opThreads, keepAlive);
+    this.member = new ProcedureMember(memberRpcs, pool, new BackupSubprocedureBuilder());
+  }
+
+  @Override
+  public String getProcedureSignature() {
+    return "backup-proc";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/75d0f49d/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupSet.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupSet.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupSet.java
new file mode 100644
index 0000000..0da6fc4
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupSet.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.util;
+
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Backup set is a named group of HBase tables, which are managed together by Backup/Restore
+ * framework. Instead of using list of tables in backup or restore operation, one can use set's name
+ * instead.
+ */
+@InterfaceAudience.Private
+public class BackupSet {
+  private final String name;
+  private final List<TableName> tables;
+
+  public BackupSet(String name, List<TableName> tables) {
+    this.name = name;
+    this.tables = tables;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public List<TableName> getTables() {
+    return tables;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(name).append("={");
+    sb.append(StringUtils.join(tables, ','));
+    sb.append("}");
+    return sb.toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/75d0f49d/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
new file mode 100644
index 0000000..e32853d
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
@@ -0,0 +1,702 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.util;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupInfo;
+import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
+import org.apache.hadoop.hbase.backup.HBackupFileSystem;
+import org.apache.hadoop.hbase.backup.RestoreRequest;
+import org.apache.hadoop.hbase.backup.impl.BackupManifest;
+import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.FSTableDescriptors;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+
+/**
+ * A collection for methods used by multiple classes to backup HBase tables.
+ */
+@InterfaceAudience.Private
+public final class BackupUtils {
+  protected static final Log LOG = LogFactory.getLog(BackupUtils.class);
+  public static final String LOGNAME_SEPARATOR = ".";
+
+  private BackupUtils() {
+    throw new AssertionError("Instantiating utility class...");
+  }
+
+  /**
+   * Loop through the RS log timestamp map for the tables, for each RS, find the min timestamp
+   * value for the RS among the tables.
+   * @param rsLogTimestampMap timestamp map
+   * @return the min timestamp of each RS
+   */
+  public static HashMap<String, Long> getRSLogTimestampMins(
+      HashMap<TableName, HashMap<String, Long>> rsLogTimestampMap) {
+
+    if (rsLogTimestampMap == null || rsLogTimestampMap.isEmpty()) {
+      return null;
+    }
+
+    HashMap<String, Long> rsLogTimestampMins = new HashMap<String, Long>();
+    HashMap<String, HashMap<TableName, Long>> rsLogTimestampMapByRS =
+        new HashMap<String, HashMap<TableName, Long>>();
+
+    for (Entry<TableName, HashMap<String, Long>> tableEntry : rsLogTimestampMap.entrySet()) {
+      TableName table = tableEntry.getKey();
+      HashMap<String, Long> rsLogTimestamp = tableEntry.getValue();
+      for (Entry<String, Long> rsEntry : rsLogTimestamp.entrySet()) {
+        String rs = rsEntry.getKey();
+        Long ts = rsEntry.getValue();
+        if (!rsLogTimestampMapByRS.containsKey(rs)) {
+          rsLogTimestampMapByRS.put(rs, new HashMap<TableName, Long>());
+          rsLogTimestampMapByRS.get(rs).put(table, ts);
+        } else {
+          rsLogTimestampMapByRS.get(rs).put(table, ts);
+        }
+      }
+    }
+
+    for (Entry<String, HashMap<TableName, Long>> entry : rsLogTimestampMapByRS.entrySet()) {
+      String rs = entry.getKey();
+      rsLogTimestampMins.put(rs, BackupUtils.getMinValue(entry.getValue()));
+    }
+
+    return rsLogTimestampMins;
+  }
+
+  /**
+   * copy out Table RegionInfo into incremental backup image need to consider move this
+   * logic into HBackupFileSystem
+   * @param conn connection
+   * @param backupInfo backup info
+   * @param conf configuration
+   * @throws IOException exception
+   * @throws InterruptedException exception
+   */
+  public static void copyTableRegionInfo(Connection conn, BackupInfo backupInfo,
+      Configuration conf) throws IOException, InterruptedException {
+    Path rootDir = FSUtils.getRootDir(conf);
+    FileSystem fs = rootDir.getFileSystem(conf);
+
+    // for each table in the table set, copy out the table info and region
+    // info files in the correct directory structure
+    for (TableName table : backupInfo.getTables()) {
+
+      if (!MetaTableAccessor.tableExists(conn, table)) {
+        LOG.warn("Table " + table + " does not exists, skipping it.");
+        continue;
+      }
+      HTableDescriptor orig = FSTableDescriptors.getTableDescriptorFromFs(fs, rootDir, table);
+
+      // write a copy of descriptor to the target directory
+      Path target = new Path(backupInfo.getTableBackupDir(table));
+      FileSystem targetFs = target.getFileSystem(conf);
+      FSTableDescriptors descriptors =
+          new FSTableDescriptors(conf, targetFs, FSUtils.getRootDir(conf));
+      descriptors.createTableDescriptorForTableDirectory(target, orig, false);
+      LOG.debug("Attempting to copy table info for:" + table + " target: " + target
+          + " descriptor: " + orig);
+      LOG.debug("Finished copying tableinfo.");
+      List<HRegionInfo> regions = null;
+      regions = MetaTableAccessor.getTableRegions(conn, table);
+      // For each region, write the region info to disk
+      LOG.debug("Starting to write region info for table " + table);
+      for (HRegionInfo regionInfo : regions) {
+        Path regionDir =
+            HRegion.getRegionDir(new Path(backupInfo.getTableBackupDir(table)),
+              regionInfo);
+        regionDir =
+            new Path(backupInfo.getTableBackupDir(table), regionDir.getName());
+        writeRegioninfoOnFilesystem(conf, targetFs, regionDir, regionInfo);
+      }
+      LOG.debug("Finished writing region info for table " + table);
+    }
+  }
+
+  /**
+   * Write the .regioninfo file on-disk.
+   */
+  public static void writeRegioninfoOnFilesystem(final Configuration conf, final FileSystem fs,
+      final Path regionInfoDir, HRegionInfo regionInfo) throws IOException {
+    final byte[] content = regionInfo.toDelimitedByteArray();
+    Path regionInfoFile = new Path(regionInfoDir, "." + HConstants.REGIONINFO_QUALIFIER_STR);
+    // First check to get the permissions
+    FsPermission perms = FSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY);
+    // Write the RegionInfo file content
+    FSDataOutputStream out = FSUtils.create(conf, fs, regionInfoFile, perms, null);
+    try {
+      out.write(content);
+    } finally {
+      out.close();
+    }
+  }
+
+  /**
+   * Parses hostname:port from WAL file path
+   * @param p path to WAL file
+   * @return hostname:port
+   */
+  public static String parseHostNameFromLogFile(Path p) {
+    try {
+      if (AbstractFSWALProvider.isArchivedLogFile(p)) {
+        return BackupUtils.parseHostFromOldLog(p);
+      } else {
+        ServerName sname = AbstractFSWALProvider.getServerNameFromWALDirectoryName(p);
+        if (sname != null) {
+          return sname.getAddress().toString();
+        } else {
+          LOG.error("Skip log file (can't parse): " + p);
+          return null;
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("Skip log file (can't parse): " + p, e);
+      return null;
+    }
+  }
+
+  /**
+   * Returns WAL file name
+   * @param walFileName WAL file name
+   * @return WAL file name
+   * @throws IOException exception
+   * @throws IllegalArgumentException exception
+   */
+  public static String getUniqueWALFileNamePart(String walFileName) throws IOException {
+    return getUniqueWALFileNamePart(new Path(walFileName));
+  }
+
+  /**
+   * Returns WAL file name
+   * @param p WAL file path
+   * @return WAL file name
+   * @throws IOException exception
+   */
+  public static String getUniqueWALFileNamePart(Path p) throws IOException {
+    return p.getName();
+  }
+
+  /**
+   * Get the total length of files under the given directory recursively.
+   * @param fs The hadoop file system
+   * @param dir The target directory
+   * @return the total length of files
+   * @throws IOException exception
+   */
+  public static long getFilesLength(FileSystem fs, Path dir) throws IOException {
+    long totalLength = 0;
+    FileStatus[] files = FSUtils.listStatus(fs, dir);
+    if (files != null) {
+      for (FileStatus fileStatus : files) {
+        if (fileStatus.isDirectory()) {
+          totalLength += getFilesLength(fs, fileStatus.getPath());
+        } else {
+          totalLength += fileStatus.getLen();
+        }
+      }
+    }
+    return totalLength;
+  }
+
+  /**
+   * Get list of all old WAL files (WALs and archive)
+   * @param c configuration
+   * @param hostTimestampMap {host,timestamp} map
+   * @return list of WAL files
+   * @throws IOException exception
+   */
+  public static List<String> getWALFilesOlderThan(final Configuration c,
+      final HashMap<String, Long> hostTimestampMap) throws IOException {
+    Path rootDir = FSUtils.getRootDir(c);
+    Path logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
+    Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+    List<String> logFiles = new ArrayList<String>();
+
+    PathFilter filter = new PathFilter() {
+
+      @Override
+      public boolean accept(Path p) {
+        try {
+          if (AbstractFSWALProvider.isMetaFile(p)) {
+            return false;
+          }
+          String host = parseHostNameFromLogFile(p);
+          if (host == null) {
+            return false;
+          }
+          Long oldTimestamp = hostTimestampMap.get(host);
+          Long currentLogTS = BackupUtils.getCreationTime(p);
+          return currentLogTS <= oldTimestamp;
+        } catch (Exception e) {
+          LOG.warn("Can not parse" + p, e);
+          return false;
+        }
+      }
+    };
+    FileSystem fs = FileSystem.get(c);
+    logFiles = BackupUtils.getFiles(fs, logDir, logFiles, filter);
+    logFiles = BackupUtils.getFiles(fs, oldLogDir, logFiles, filter);
+    return logFiles;
+  }
+
+  public static TableName[] parseTableNames(String tables) {
+    if (tables == null) {
+      return null;
+    }
+    String[] tableArray = tables.split(BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND);
+
+    TableName[] ret = new TableName[tableArray.length];
+    for (int i = 0; i < tableArray.length; i++) {
+      ret[i] = TableName.valueOf(tableArray[i]);
+    }
+    return ret;
+  }
+
+
+  /**
+   * Check whether the backup path exist
+   * @param backupStr backup
+   * @param conf configuration
+   * @return Yes if path exists
+   * @throws IOException exception
+   */
+  public static boolean checkPathExist(String backupStr, Configuration conf) throws IOException {
+    boolean isExist = false;
+    Path backupPath = new Path(backupStr);
+    FileSystem fileSys = backupPath.getFileSystem(conf);
+    String targetFsScheme = fileSys.getUri().getScheme();
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Schema of given url: " + backupStr + " is: " + targetFsScheme);
+    }
+    if (fileSys.exists(backupPath)) {
+      isExist = true;
+    }
+    return isExist;
+  }
+
+  /**
+   * Check target path first, confirm it doesn't exist before backup
+   * @param backupRootPath backup destination path
+   * @param conf configuration
+   * @throws IOException exception
+   */
+  public static void checkTargetDir(String backupRootPath, Configuration conf) throws IOException {
+    boolean targetExists = false;
+    try {
+      targetExists = checkPathExist(backupRootPath, conf);
+    } catch (IOException e) {
+      String expMsg = e.getMessage();
+      String newMsg = null;
+      if (expMsg.contains("No FileSystem for scheme")) {
+        newMsg =
+            "Unsupported filesystem scheme found in the backup target url. Error Message: "
+                + newMsg;
+        LOG.error(newMsg);
+        throw new IOException(newMsg);
+      } else {
+        throw e;
+      }
+    }
+
+    if (targetExists) {
+      LOG.info("Using existing backup root dir: " + backupRootPath);
+    } else {
+      LOG.info("Backup root dir " + backupRootPath + " does not exist. Will be created.");
+    }
+  }
+
+  /**
+   * Get the min value for all the Values a map.
+   * @param map map
+   * @return the min value
+   */
+  public static <T> Long getMinValue(HashMap<T, Long> map) {
+    Long minTimestamp = null;
+    if (map != null) {
+      ArrayList<Long> timestampList = new ArrayList<Long>(map.values());
+      Collections.sort(timestampList);
+      // The min among all the RS log timestamps will be kept in backup system table table.
+      minTimestamp = timestampList.get(0);
+    }
+    return minTimestamp;
+  }
+
+  /**
+   * Parses host name:port from archived WAL path
+   * @param p path
+   * @return host name
+   * @throws IOException exception
+   */
+  public static String parseHostFromOldLog(Path p) {
+    try {
+      String n = p.getName();
+      int idx = n.lastIndexOf(LOGNAME_SEPARATOR);
+      String s = URLDecoder.decode(n.substring(0, idx), "UTF8");
+      return ServerName.parseHostname(s) + ":" + ServerName.parsePort(s);
+    } catch (Exception e) {
+      LOG.warn("Skip log file (can't parse): " + p);
+      return null;
+    }
+  }
+
+  /**
+   * Given the log file, parse the timestamp from the file name. The timestamp is the last number.
+   * @param p a path to the log file
+   * @return the timestamp
+   * @throws IOException exception
+   */
+  public static Long getCreationTime(Path p) throws IOException {
+    int idx = p.getName().lastIndexOf(LOGNAME_SEPARATOR);
+    if (idx < 0) {
+      throw new IOException("Cannot parse timestamp from path " + p);
+    }
+    String ts = p.getName().substring(idx + 1);
+    return Long.parseLong(ts);
+  }
+
+  public static List<String> getFiles(FileSystem fs, Path rootDir, List<String> files,
+      PathFilter filter) throws FileNotFoundException, IOException {
+    RemoteIterator<LocatedFileStatus> it = fs.listFiles(rootDir, true);
+
+    while (it.hasNext()) {
+      LocatedFileStatus lfs = it.next();
+      if (lfs.isDirectory()) {
+        continue;
+      }
+      // apply filter
+      if (filter.accept(lfs.getPath())) {
+        files.add(lfs.getPath().toString());
+      }
+    }
+    return files;
+  }
+
+  public static void cleanupBackupData(BackupInfo context, Configuration conf) throws IOException {
+    cleanupHLogDir(context, conf);
+    cleanupTargetDir(context, conf);
+  }
+
+  /**
+   * Clean up directories which are generated when DistCp copying hlogs
+   * @param backupInfo backup info
+   * @param conf configuration
+   * @throws IOException exception
+   */
+  private static void cleanupHLogDir(BackupInfo backupInfo, Configuration conf)
+      throws IOException {
+
+    String logDir = backupInfo.getHLogTargetDir();
+    if (logDir == null) {
+      LOG.warn("No log directory specified for " + backupInfo.getBackupId());
+      return;
+    }
+
+    Path rootPath = new Path(logDir).getParent();
+    FileSystem fs = FileSystem.get(rootPath.toUri(), conf);
+    FileStatus[] files = listStatus(fs, rootPath, null);
+    if (files == null) {
+      return;
+    }
+    for (FileStatus file : files) {
+      LOG.debug("Delete log files: " + file.getPath().getName());
+      fs.delete(file.getPath(), true);
+    }
+  }
+
+
+  private static void cleanupTargetDir(BackupInfo backupInfo, Configuration conf) {
+    try {
+      // clean up the data at target directory
+      LOG.debug("Trying to cleanup up target dir : " + backupInfo.getBackupId());
+      String targetDir = backupInfo.getBackupRootDir();
+      if (targetDir == null) {
+        LOG.warn("No target directory specified for " + backupInfo.getBackupId());
+        return;
+      }
+
+      FileSystem outputFs = FileSystem.get(new Path(backupInfo.getBackupRootDir()).toUri(), conf);
+
+      for (TableName table : backupInfo.getTables()) {
+        Path targetDirPath =
+            new Path(getTableBackupDir(backupInfo.getBackupRootDir(), backupInfo.getBackupId(),
+              table));
+        if (outputFs.delete(targetDirPath, true)) {
+          LOG.info("Cleaning up backup data at " + targetDirPath.toString() + " done.");
+        } else {
+          LOG.info("No data has been found in " + targetDirPath.toString() + ".");
+        }
+
+        Path tableDir = targetDirPath.getParent();
+        FileStatus[] backups = listStatus(outputFs, tableDir, null);
+        if (backups == null || backups.length == 0) {
+          outputFs.delete(tableDir, true);
+          LOG.debug(tableDir.toString() + " is empty, remove it.");
+        }
+      }
+      outputFs.delete(new Path(targetDir, backupInfo.getBackupId()), true);
+    } catch (IOException e1) {
+      LOG.error("Cleaning up backup data of " + backupInfo.getBackupId() + " at "
+          + backupInfo.getBackupRootDir() + " failed due to " + e1.getMessage() + ".");
+    }
+  }
+
+  /**
+   * Given the backup root dir, backup id and the table name, return the backup image location,
+   * which is also where the backup manifest file is. return value look like:
+   * "hdfs://backup.hbase.org:9000/user/biadmin/backup1/backup_1396650096738/default/t1_dn/"
+   * @param backupRootDir backup root directory
+   * @param backupId backup id
+   * @param tableName table name
+   * @return backupPath String for the particular table
+   */
+  public static String getTableBackupDir(String backupRootDir, String backupId,
+      TableName tableName) {
+    return backupRootDir + Path.SEPARATOR + backupId + Path.SEPARATOR
+        + tableName.getNamespaceAsString() + Path.SEPARATOR + tableName.getQualifierAsString()
+        + Path.SEPARATOR;
+  }
+
+  /**
+   * Sort history list by start time in descending order.
+   * @param historyList history list
+   * @return sorted list of BackupCompleteData
+   */
+  public static ArrayList<BackupInfo> sortHistoryListDesc(ArrayList<BackupInfo> historyList) {
+    ArrayList<BackupInfo> list = new ArrayList<BackupInfo>();
+    TreeMap<String, BackupInfo> map = new TreeMap<String, BackupInfo>();
+    for (BackupInfo h : historyList) {
+      map.put(Long.toString(h.getStartTs()), h);
+    }
+    Iterator<String> i = map.descendingKeySet().iterator();
+    while (i.hasNext()) {
+      list.add(map.get(i.next()));
+    }
+    return list;
+  }
+
+
+  /**
+   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal This accommodates
+   * differences between hadoop versions, where hadoop 1 does not throw a FileNotFoundException, and
+   * return an empty FileStatus[] while Hadoop 2 will throw FileNotFoundException.
+   * @param fs file system
+   * @param dir directory
+   * @param filter path filter
+   * @return null if dir is empty or doesn't exist, otherwise FileStatus array
+   */
+  public static FileStatus[]
+      listStatus(final FileSystem fs, final Path dir, final PathFilter filter) throws IOException {
+    FileStatus[] status = null;
+    try {
+      status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter);
+    } catch (FileNotFoundException fnfe) {
+      // if directory doesn't exist, return null
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(dir + " doesn't exist");
+      }
+    }
+    if (status == null || status.length < 1) return null;
+    return status;
+  }
+
+  /**
+   * Return the 'path' component of a Path. In Hadoop, Path is an URI. This method returns the
+   * 'path' component of a Path's URI: e.g. If a Path is
+   * <code>hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir</code>, this method returns
+   * <code>/hbase_trunk/TestTable/compaction.dir</code>. This method is useful if you want to print
+   * out a Path without qualifying Filesystem instance.
+   * @param p file system Path whose 'path' component we are to return.
+   * @return Path portion of the Filesystem
+   */
+  public static String getPath(Path p) {
+    return p.toUri().getPath();
+  }
+
+  /**
+   * Given the backup root dir and the backup id, return the log file location for an incremental
+   * backup.
+   * @param backupRootDir backup root directory
+   * @param backupId backup id
+   * @return logBackupDir: ".../user/biadmin/backup1/WALs/backup_1396650096738"
+   */
+  public static String getLogBackupDir(String backupRootDir, String backupId) {
+    return backupRootDir + Path.SEPARATOR + backupId + Path.SEPARATOR
+        + HConstants.HREGION_LOGDIR_NAME;
+  }
+
+  private static List<BackupInfo> getHistory(Configuration conf, Path backupRootPath)
+      throws IOException {
+    // Get all (n) history from backup root destination
+    FileSystem fs = FileSystem.get(conf);
+    RemoteIterator<LocatedFileStatus> it = fs.listLocatedStatus(backupRootPath);
+
+    List<BackupInfo> infos = new ArrayList<BackupInfo>();
+    while (it.hasNext()) {
+      LocatedFileStatus lfs = it.next();
+      if (!lfs.isDirectory()) continue;
+      String backupId = lfs.getPath().getName();
+      try {
+        BackupInfo info = loadBackupInfo(backupRootPath, backupId, fs);
+        infos.add(info);
+      } catch (IOException e) {
+        LOG.error("Can not load backup info from: " + lfs.getPath(), e);
+      }
+    }
+    // Sort
+    Collections.sort(infos, new Comparator<BackupInfo>() {
+
+      @Override
+      public int compare(BackupInfo o1, BackupInfo o2) {
+        long ts1 = getTimestamp(o1.getBackupId());
+        long ts2 = getTimestamp(o2.getBackupId());
+        if (ts1 == ts2) return 0;
+        return ts1 < ts2 ? 1 : -1;
+      }
+
+      private long getTimestamp(String backupId) {
+        String[] split = backupId.split("_");
+        return Long.parseLong(split[1]);
+      }
+    });
+    return infos;
+  }
+
+  public static List<BackupInfo> getHistory(Configuration conf, int n, Path backupRootPath,
+      BackupInfo.Filter... filters) throws IOException {
+    List<BackupInfo> infos = getHistory(conf, backupRootPath);
+    List<BackupInfo> ret = new ArrayList<BackupInfo>();
+    for (BackupInfo info : infos) {
+      if (ret.size() == n) {
+        break;
+      }
+      boolean passed = true;
+      for (int i = 0; i < filters.length; i++) {
+        if (!filters[i].apply(info)) {
+          passed = false;
+          break;
+        }
+      }
+      if (passed) {
+        ret.add(info);
+      }
+    }
+    return ret;
+  }
+
+  public static BackupInfo loadBackupInfo(Path backupRootPath, String backupId, FileSystem fs)
+      throws IOException {
+    Path backupPath = new Path(backupRootPath, backupId);
+
+    RemoteIterator<LocatedFileStatus> it = fs.listFiles(backupPath, true);
+    while (it.hasNext()) {
+      LocatedFileStatus lfs = it.next();
+      if (lfs.getPath().getName().equals(BackupManifest.MANIFEST_FILE_NAME)) {
+        // Load BackupManifest
+        BackupManifest manifest = new BackupManifest(fs, lfs.getPath().getParent());
+        BackupInfo info = manifest.toBackupInfo();
+        return info;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Create restore request.
+   * @param backupRootDir backup root dir
+   * @param backupId backup id
+   * @param check check only
+   * @param fromTables table list from
+   * @param toTables   table list to
+   * @param isOverwrite overwrite data
+   * @return request obkect
+   */
+  public static RestoreRequest createRestoreRequest(String backupRootDir, String backupId,
+      boolean check, TableName[] fromTables, TableName[] toTables, boolean isOverwrite) {
+    RestoreRequest.Builder builder = new RestoreRequest.Builder();
+    RestoreRequest request = builder.withBackupRootDir(backupRootDir)
+                                    .withBackupId(backupId)
+                                    .withCheck(check)
+                                    .withFromTables(fromTables)
+                                    .withToTables(toTables)
+                                    .withOvewrite(isOverwrite).build();
+    return request;
+  }
+
+  public static boolean validate(HashMap<TableName, BackupManifest> backupManifestMap,
+      Configuration conf) throws IOException {
+    boolean isValid = true;
+
+    for (Entry<TableName, BackupManifest> manifestEntry : backupManifestMap.entrySet()) {
+      TableName table = manifestEntry.getKey();
+      TreeSet<BackupImage> imageSet = new TreeSet<BackupImage>();
+
+      ArrayList<BackupImage> depList = manifestEntry.getValue().getDependentListByTable(table);
+      if (depList != null && !depList.isEmpty()) {
+        imageSet.addAll(depList);
+      }
+
+      LOG.info("Dependent image(s) from old to new:");
+      for (BackupImage image : imageSet) {
+        String imageDir =
+            HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(), table);
+        if (!BackupUtils.checkPathExist(imageDir, conf)) {
+          LOG.error("ERROR: backup image does not exist: " + imageDir);
+          isValid = false;
+          break;
+        }
+        LOG.info("Backup image: " + image.getBackupId() + " for '" + table + "' is available");
+      }
+    }
+    return isValid;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/75d0f49d/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java
new file mode 100644
index 0000000..a130c21
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java
@@ -0,0 +1,610 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.util;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupRestoreFactory;
+import org.apache.hadoop.hbase.backup.HBackupFileSystem;
+import org.apache.hadoop.hbase.backup.RestoreJob;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.SnapshotDescription;
+import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
+import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSTableDescriptors;
+
+/**
+ * A collection for methods used by multiple classes to restore HBase tables.
+ */
+@InterfaceAudience.Private
+public class RestoreTool {
+
+  public static final Log LOG = LogFactory.getLog(BackupUtils.class);
+
+  private final String[] ignoreDirs = { HConstants.RECOVERED_EDITS_DIR };
+
+  private final static long TABLE_AVAILABILITY_WAIT_TIME = 180000;
+
+  protected Configuration conf = null;
+
+  protected Path backupRootPath;
+
+  protected String backupId;
+
+  protected FileSystem fs;
+  private final Path restoreTmpPath;
+
+  // store table name and snapshot dir mapping
+  private final HashMap<TableName, Path> snapshotMap = new HashMap<>();
+
+  public RestoreTool(Configuration conf, final Path backupRootPath, final String backupId)
+      throws IOException {
+    this.conf = conf;
+    this.backupRootPath = backupRootPath;
+    this.backupId = backupId;
+    this.fs = backupRootPath.getFileSystem(conf);
+    this.restoreTmpPath =
+        new Path(conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
+          HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY), "restore");
+  }
+
+  /**
+   * return value represent path for:
+   * ".../user/biadmin/backup1/default/t1_dn/backup_1396650096738/archive/data/default/t1_dn"
+   * @param tabelName table name
+   * @return path to table archive
+   * @throws IOException exception
+   */
+  Path getTableArchivePath(TableName tableName) throws IOException {
+
+    Path baseDir =
+        new Path(HBackupFileSystem.getTableBackupPath(tableName, backupRootPath, backupId),
+            HConstants.HFILE_ARCHIVE_DIRECTORY);
+    Path dataDir = new Path(baseDir, HConstants.BASE_NAMESPACE_DIR);
+    Path archivePath = new Path(dataDir, tableName.getNamespaceAsString());
+    Path tableArchivePath = new Path(archivePath, tableName.getQualifierAsString());
+    if (!fs.exists(tableArchivePath) || !fs.getFileStatus(tableArchivePath).isDirectory()) {
+      LOG.debug("Folder tableArchivePath: " + tableArchivePath.toString() + " does not exists");
+      tableArchivePath = null; // empty table has no archive
+    }
+    return tableArchivePath;
+  }
+
+  /**
+   * Gets region list
+   * @param tableName table name
+   * @return RegionList region list
+   * @throws FileNotFoundException exception
+   * @throws IOException exception
+   */
+  ArrayList<Path> getRegionList(TableName tableName) throws FileNotFoundException, IOException {
+    Path tableArchivePath = getTableArchivePath(tableName);
+    ArrayList<Path> regionDirList = new ArrayList<Path>();
+    FileStatus[] children = fs.listStatus(tableArchivePath);
+    for (FileStatus childStatus : children) {
+      // here child refer to each region(Name)
+      Path child = childStatus.getPath();
+      regionDirList.add(child);
+    }
+    return regionDirList;
+  }
+
+
+  void modifyTableSync(Connection conn, HTableDescriptor desc) throws IOException {
+
+    try (Admin admin = conn.getAdmin();) {
+      admin.modifyTable(desc.getTableName(), desc);
+      int attempt = 0;
+      int maxAttempts = 600;
+      while (!admin.isTableAvailable(desc.getTableName())) {
+        Thread.sleep(100);
+        attempt++;
+        if (attempt++ > maxAttempts) {
+          throw new IOException("Timeout expired " + (maxAttempts * 100) + "ms");
+        }
+      }
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * During incremental backup operation. Call WalPlayer to replay WAL in backup image Currently
+   * tableNames and newTablesNames only contain single table, will be expanded to multiple tables in
+   * the future
+   * @param conn HBase connection
+   * @param tableBackupPath backup path
+   * @param logDirs : incremental backup folders, which contains WAL
+   * @param tableNames : source tableNames(table names were backuped)
+   * @param newTableNames : target tableNames(table names to be restored to)
+   * @param incrBackupId incremental backup Id
+   * @throws IOException exception
+   */
+  public void incrementalRestoreTable(Connection conn, Path tableBackupPath, Path[] logDirs,
+      TableName[] tableNames, TableName[] newTableNames, String incrBackupId) throws IOException {
+
+    try (Admin admin = conn.getAdmin();) {
+      if (tableNames.length != newTableNames.length) {
+        throw new IOException("Number of source tables and target tables does not match!");
+      }
+      FileSystem fileSys = tableBackupPath.getFileSystem(this.conf);
+
+      // for incremental backup image, expect the table already created either by user or previous
+      // full backup. Here, check that all new tables exists
+      for (TableName tableName : newTableNames) {
+        if (!admin.tableExists(tableName)) {
+          throw new IOException("HBase table " + tableName
+              + " does not exist. Create the table first, e.g. by restoring a full backup.");
+        }
+      }
+      // adjust table schema
+      for (int i = 0; i < tableNames.length; i++) {
+        TableName tableName = tableNames[i];
+        HTableDescriptor tableDescriptor = getTableDescriptor(fileSys, tableName, incrBackupId);
+        LOG.debug("Found descriptor " + tableDescriptor + " through " + incrBackupId);
+
+        TableName newTableName = newTableNames[i];
+        HTableDescriptor newTableDescriptor = admin.getTableDescriptor(newTableName);
+        List<HColumnDescriptor> families = Arrays.asList(tableDescriptor.getColumnFamilies());
+        List<HColumnDescriptor> existingFamilies =
+            Arrays.asList(newTableDescriptor.getColumnFamilies());
+        boolean schemaChangeNeeded = false;
+        for (HColumnDescriptor family : families) {
+          if (!existingFamilies.contains(family)) {
+            newTableDescriptor.addFamily(family);
+            schemaChangeNeeded = true;
+          }
+        }
+        for (HColumnDescriptor family : existingFamilies) {
+          if (!families.contains(family)) {
+            newTableDescriptor.removeFamily(family.getName());
+            schemaChangeNeeded = true;
+          }
+        }
+        if (schemaChangeNeeded) {
+          modifyTableSync(conn, newTableDescriptor);
+          LOG.info("Changed " + newTableDescriptor.getTableName() + " to: " + newTableDescriptor);
+        }
+      }
+      RestoreJob restoreService = BackupRestoreFactory.getRestoreJob(conf);
+
+      restoreService.run(logDirs, tableNames, newTableNames, false);
+    }
+  }
+
+  public void fullRestoreTable(Connection conn, Path tableBackupPath, TableName tableName,
+      TableName newTableName, boolean truncateIfExists, String lastIncrBackupId)
+          throws IOException {
+    restoreTableAndCreate(conn, tableName, newTableName, tableBackupPath, truncateIfExists,
+      lastIncrBackupId);
+  }
+
+  /**
+   * Returns value represent path for path to backup table snapshot directory:
+   * "/$USER/SBACKUP_ROOT/backup_id/namespace/table/.hbase-snapshot"
+   * @param backupRootPath backup root path
+   * @param tableName table name
+   * @param backupId backup Id
+   * @return path for snapshot
+   */
+  Path getTableSnapshotPath(Path backupRootPath, TableName tableName, String backupId) {
+    return new Path(HBackupFileSystem.getTableBackupPath(tableName, backupRootPath, backupId),
+        HConstants.SNAPSHOT_DIR_NAME);
+  }
+
+  /**
+   * Returns value represent path for:
+   * ""/$USER/SBACKUP_ROOT/backup_id/namespace/table/.hbase-snapshot/snapshot_1396650097621_namespace_table"
+   * this path contains .snapshotinfo, .tabledesc (0.96 and 0.98) this path contains .snapshotinfo,
+   * .data.manifest (trunk)
+   * @param tableName table name
+   * @return path to table info
+   * @throws FileNotFoundException exception
+   * @throws IOException exception
+   */
+  Path getTableInfoPath(TableName tableName) throws FileNotFoundException, IOException {
+    Path tableSnapShotPath = getTableSnapshotPath(backupRootPath, tableName, backupId);
+    Path tableInfoPath = null;
+
+    // can't build the path directly as the timestamp values are different
+    FileStatus[] snapshots = fs.listStatus(tableSnapShotPath);
+    for (FileStatus snapshot : snapshots) {
+      tableInfoPath = snapshot.getPath();
+      // SnapshotManifest.DATA_MANIFEST_NAME = "data.manifest";
+      if (tableInfoPath.getName().endsWith("data.manifest")) {
+        break;
+      }
+    }
+    return tableInfoPath;
+  }
+
+  /**
+   * Get table descriptor
+   * @param tableName is the table backed up
+   * @return {@link HTableDescriptor} saved in backup image of the table
+   */
+  HTableDescriptor getTableDesc(TableName tableName) throws FileNotFoundException, IOException {
+    Path tableInfoPath = this.getTableInfoPath(tableName);
+    SnapshotDescription desc = SnapshotDescriptionUtils.readSnapshotInfo(fs, tableInfoPath);
+    SnapshotManifest manifest = SnapshotManifest.open(conf, fs, tableInfoPath, desc);
+    HTableDescriptor tableDescriptor = manifest.getTableDescriptor();
+    if (!tableDescriptor.getTableName().equals(tableName)) {
+      LOG.error("couldn't find Table Desc for table: " + tableName + " under tableInfoPath: "
+          + tableInfoPath.toString());
+      LOG.error("tableDescriptor.getNameAsString() = " + tableDescriptor.getNameAsString());
+      throw new FileNotFoundException("couldn't find Table Desc for table: " + tableName
+          + " under tableInfoPath: " + tableInfoPath.toString());
+    }
+    return tableDescriptor;
+  }
+
+  /**
+   * Duplicate the backup image if it's on local cluster
+   * @see HStore#bulkLoadHFile(String, long)
+   * @see HRegionFileSystem#bulkLoadStoreFile(String familyName, Path srcPath, long seqNum)
+   * @param tableArchivePath archive path
+   * @return the new tableArchivePath
+   * @throws IOException exception
+   */
+  Path checkLocalAndBackup(Path tableArchivePath) throws IOException {
+    // Move the file if it's on local cluster
+    boolean isCopyNeeded = false;
+
+    FileSystem srcFs = tableArchivePath.getFileSystem(conf);
+    FileSystem desFs = FileSystem.get(conf);
+    if (tableArchivePath.getName().startsWith("/")) {
+      isCopyNeeded = true;
+    } else {
+      // This should match what is done in @see HRegionFileSystem#bulkLoadStoreFile(String, Path,
+      // long)
+      if (srcFs.getUri().equals(desFs.getUri())) {
+        LOG.debug("cluster hold the backup image: " + srcFs.getUri() + "; local cluster node: "
+            + desFs.getUri());
+        isCopyNeeded = true;
+      }
+    }
+    if (isCopyNeeded) {
+      LOG.debug("File " + tableArchivePath + " on local cluster, back it up before restore");
+      if (desFs.exists(restoreTmpPath)) {
+        try {
+          desFs.delete(restoreTmpPath, true);
+        } catch (IOException e) {
+          LOG.debug("Failed to delete path: " + restoreTmpPath
+              + ", need to check whether restore target DFS cluster is healthy");
+        }
+      }
+      FileUtil.copy(srcFs, tableArchivePath, desFs, restoreTmpPath, false, conf);
+      LOG.debug("Copied to temporary path on local cluster: " + restoreTmpPath);
+      tableArchivePath = restoreTmpPath;
+    }
+    return tableArchivePath;
+  }
+
+  private HTableDescriptor getTableDescriptor(FileSystem fileSys, TableName tableName,
+      String lastIncrBackupId) throws IOException {
+    if (lastIncrBackupId != null) {
+      String target =
+          BackupUtils.getTableBackupDir(backupRootPath.toString(),
+            lastIncrBackupId, tableName);
+      return FSTableDescriptors.getTableDescriptorFromFs(fileSys, new Path(target));
+    }
+    return null;
+  }
+
+  private void restoreTableAndCreate(Connection conn, TableName tableName, TableName newTableName,
+      Path tableBackupPath, boolean truncateIfExists, String lastIncrBackupId) throws IOException {
+    if (newTableName == null) {
+      newTableName = tableName;
+    }
+    FileSystem fileSys = tableBackupPath.getFileSystem(this.conf);
+
+    // get table descriptor first
+    HTableDescriptor tableDescriptor = getTableDescriptor(fileSys, tableName, lastIncrBackupId);
+    if (tableDescriptor != null) {
+      LOG.debug("Retrieved descriptor: " + tableDescriptor + " thru " + lastIncrBackupId);
+    }
+
+    if (tableDescriptor == null) {
+      Path tableSnapshotPath = getTableSnapshotPath(backupRootPath, tableName, backupId);
+      if (fileSys.exists(tableSnapshotPath)) {
+        // snapshot path exist means the backup path is in HDFS
+        // check whether snapshot dir already recorded for target table
+        if (snapshotMap.get(tableName) != null) {
+          SnapshotDescription desc =
+              SnapshotDescriptionUtils.readSnapshotInfo(fileSys, tableSnapshotPath);
+          SnapshotManifest manifest = SnapshotManifest.open(conf, fileSys, tableSnapshotPath, desc);
+          tableDescriptor = manifest.getTableDescriptor();
+        } else {
+          tableDescriptor = getTableDesc(tableName);
+          snapshotMap.put(tableName, getTableInfoPath(tableName));
+        }
+        if (tableDescriptor == null) {
+          LOG.debug("Found no table descriptor in the snapshot dir, previous schema would be lost");
+        }
+      } else {
+        throw new IOException("Table snapshot directory: " +
+            tableSnapshotPath + " does not exist.");
+      }
+    }
+
+    Path tableArchivePath = getTableArchivePath(tableName);
+    if (tableArchivePath == null) {
+      if (tableDescriptor != null) {
+        // find table descriptor but no archive dir means the table is empty, create table and exit
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("find table descriptor but no archive dir for table " + tableName
+              + ", will only create table");
+        }
+        tableDescriptor.setName(newTableName);
+        checkAndCreateTable(conn, tableBackupPath, tableName, newTableName, null, tableDescriptor,
+          truncateIfExists);
+        return;
+      } else {
+        throw new IllegalStateException("Cannot restore hbase table because directory '"
+            + " tableArchivePath is null.");
+      }
+    }
+
+    if (tableDescriptor == null) {
+      tableDescriptor = new HTableDescriptor(newTableName);
+    } else {
+      tableDescriptor.setName(newTableName);
+    }
+
+    // record all region dirs:
+    // load all files in dir
+    try {
+      ArrayList<Path> regionPathList = getRegionList(tableName);
+
+      // should only try to create the table with all region informations, so we could pre-split
+      // the regions in fine grain
+      checkAndCreateTable(conn, tableBackupPath, tableName, newTableName, regionPathList,
+        tableDescriptor, truncateIfExists);
+      if (tableArchivePath != null) {
+        // start real restore through bulkload
+        // if the backup target is on local cluster, special action needed
+        Path tempTableArchivePath = checkLocalAndBackup(tableArchivePath);
+        if (tempTableArchivePath.equals(tableArchivePath)) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("TableArchivePath for bulkload using existPath: " + tableArchivePath);
+          }
+        } else {
+          regionPathList = getRegionList(tempTableArchivePath); // point to the tempDir
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("TableArchivePath for bulkload using tempPath: " + tempTableArchivePath);
+          }
+        }
+
+        LoadIncrementalHFiles loader = createLoader(tempTableArchivePath, false);
+        for (Path regionPath : regionPathList) {
+          String regionName = regionPath.toString();
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Restoring HFiles from directory " + regionName);
+          }
+          String[] args = { regionName, newTableName.getNameAsString() };
+          loader.run(args);
+        }
+      }
+      // we do not recovered edits
+    } catch (Exception e) {
+      throw new IllegalStateException("Cannot restore hbase table", e);
+    }
+  }
+
+  /**
+   * Gets region list
+   * @param tableArchivePath table archive path
+   * @return RegionList region list
+   * @throws FileNotFoundException exception
+   * @throws IOException exception
+   */
+  ArrayList<Path> getRegionList(Path tableArchivePath) throws FileNotFoundException, IOException {
+    ArrayList<Path> regionDirList = new ArrayList<Path>();
+    FileStatus[] children = fs.listStatus(tableArchivePath);
+    for (FileStatus childStatus : children) {
+      // here child refer to each region(Name)
+      Path child = childStatus.getPath();
+      regionDirList.add(child);
+    }
+    return regionDirList;
+  }
+
+  /**
+   * Create a {@link LoadIncrementalHFiles} instance to be used to restore the HFiles of a full
+   * backup.
+   * @return the {@link LoadIncrementalHFiles} instance
+   * @throws IOException exception
+   */
+  private LoadIncrementalHFiles createLoader(Path tableArchivePath, boolean multipleTables)
+      throws IOException {
+
+    // By default, it is 32 and loader will fail if # of files in any region exceed this
+    // limit. Bad for snapshot restore.
+    this.conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE);
+    this.conf.set(LoadIncrementalHFiles.IGNORE_UNMATCHED_CF_CONF_KEY, "yes");
+    LoadIncrementalHFiles loader = null;
+    try {
+      loader = new LoadIncrementalHFiles(this.conf);
+    } catch (Exception e1) {
+      throw new IOException(e1);
+    }
+    return loader;
+  }
+
+  /**
+   * Calculate region boundaries and add all the column families to the table descriptor
+   * @param regionDirList region dir list
+   * @return a set of keys to store the boundaries
+   */
+  byte[][] generateBoundaryKeys(ArrayList<Path> regionDirList) throws FileNotFoundException,
+      IOException {
+    TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+    // Build a set of keys to store the boundaries
+    // calculate region boundaries and add all the column families to the table descriptor
+    for (Path regionDir : regionDirList) {
+      LOG.debug("Parsing region dir: " + regionDir);
+      Path hfofDir = regionDir;
+
+      if (!fs.exists(hfofDir)) {
+        LOG.warn("HFileOutputFormat dir " + hfofDir + " not found");
+      }
+
+      FileStatus[] familyDirStatuses = fs.listStatus(hfofDir);
+      if (familyDirStatuses == null) {
+        throw new IOException("No families found in " + hfofDir);
+      }
+
+      for (FileStatus stat : familyDirStatuses) {
+        if (!stat.isDirectory()) {
+          LOG.warn("Skipping non-directory " + stat.getPath());
+          continue;
+        }
+        boolean isIgnore = false;
+        String pathName = stat.getPath().getName();
+        for (String ignore : ignoreDirs) {
+          if (pathName.contains(ignore)) {
+            LOG.warn("Skipping non-family directory" + pathName);
+            isIgnore = true;
+            break;
+          }
+        }
+        if (isIgnore) {
+          continue;
+        }
+        Path familyDir = stat.getPath();
+        LOG.debug("Parsing family dir [" + familyDir.toString() + " in region [" + regionDir + "]");
+        // Skip _logs, etc
+        if (familyDir.getName().startsWith("_") || familyDir.getName().startsWith(".")) {
+          continue;
+        }
+
+        // start to parse hfile inside one family dir
+        Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir));
+        for (Path hfile : hfiles) {
+          if (hfile.getName().startsWith("_") || hfile.getName().startsWith(".")
+              || StoreFileInfo.isReference(hfile.getName())
+              || HFileLink.isHFileLink(hfile.getName())) {
+            continue;
+          }
+          HFile.Reader reader = HFile.createReader(fs, hfile, conf);
+          final byte[] first, last;
+          try {
+            reader.loadFileInfo();
+            first = reader.getFirstRowKey();
+            last = reader.getLastRowKey();
+            LOG.debug("Trying to figure out region boundaries hfile=" + hfile + " first="
+                + Bytes.toStringBinary(first) + " last=" + Bytes.toStringBinary(last));
+
+            // To eventually infer start key-end key boundaries
+            Integer value = map.containsKey(first) ? (Integer) map.get(first) : 0;
+            map.put(first, value + 1);
+            value = map.containsKey(last) ? (Integer) map.get(last) : 0;
+            map.put(last, value - 1);
+          } finally {
+            reader.close();
+          }
+        }
+      }
+    }
+    return LoadIncrementalHFiles.inferBoundaries(map);
+  }
+
+  /**
+   * Prepare the table for bulkload, most codes copied from
+   * {@link LoadIncrementalHFiles#createTable(String, String)}
+   * @param conn connection
+   * @param tableBackupPath path
+   * @param tableName table name
+   * @param targetTableName target table name
+   * @param regionDirList region directory list
+   * @param htd table descriptor
+   * @param truncateIfExists truncates table if exists
+   * @throws IOException exception
+   */
+  private void checkAndCreateTable(Connection conn, Path tableBackupPath, TableName tableName,
+      TableName targetTableName, ArrayList<Path> regionDirList, HTableDescriptor htd,
+      boolean truncateIfExists) throws IOException {
+    try (Admin admin = conn.getAdmin();) {
+      boolean createNew = false;
+      if (admin.tableExists(targetTableName)) {
+        if (truncateIfExists) {
+          LOG.info("Truncating exising target table '" + targetTableName
+              + "', preserving region splits");
+          admin.disableTable(targetTableName);
+          admin.truncateTable(targetTableName, true);
+        } else {
+          LOG.info("Using exising target table '" + targetTableName + "'");
+        }
+      } else {
+        createNew = true;
+      }
+      if (createNew) {
+        LOG.info("Creating target table '" + targetTableName + "'");
+        byte[][] keys = null;
+        if (regionDirList == null || regionDirList.size() == 0) {
+          admin.createTable(htd, null);
+        } else {
+          keys = generateBoundaryKeys(regionDirList);
+          // create table using table descriptor and region boundaries
+          admin.createTable(htd, keys);
+        }
+        long startTime = EnvironmentEdgeManager.currentTime();
+        while (!admin.isTableAvailable(targetTableName, keys)) {
+          try {
+            Thread.sleep(100);
+          } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+          }
+          if (EnvironmentEdgeManager.currentTime() - startTime > TABLE_AVAILABILITY_WAIT_TIME) {
+            throw new IOException("Time out " + TABLE_AVAILABILITY_WAIT_TIME + "ms expired, table "
+                + targetTableName + " is still not available");
+          }
+        }
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/75d0f49d/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
index ae36f08..0e3755b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
@@ -17,9 +17,14 @@
  */
 package org.apache.hadoop.hbase.coordination;
 
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import java.io.IOException;
+
 import org.apache.hadoop.hbase.CoordinatedStateManager;
 import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
+import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
+import org.apache.zookeeper.KeeperException;
 
 /**
  * Base class for {@link org.apache.hadoop.hbase.CoordinatedStateManager} implementations.
@@ -51,8 +56,21 @@ public abstract class BaseCoordinatedStateManager implements CoordinatedStateMan
    * Method to retrieve coordination for split log worker
    */
   public abstract  SplitLogWorkerCoordination getSplitLogWorkerCoordination();
+
   /**
    * Method to retrieve coordination for split log manager
    */
   public abstract SplitLogManagerCoordination getSplitLogManagerCoordination();
+  /**
+   * Method to retrieve {@link org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs}
+   */
+  public abstract ProcedureCoordinatorRpcs
+    getProcedureCoordinatorRpcs(String procType, String coordNode) throws IOException;
+
+  /**
+   * Method to retrieve {@link org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs}
+   */
+  public abstract ProcedureMemberRpcs
+    getProcedureMemberRpcs(String procType) throws KeeperException;
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/75d0f49d/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
index 3e89be7..8ce5f9c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
@@ -17,10 +17,17 @@
  */
 package org.apache.hadoop.hbase.coordination;
 
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import java.io.IOException;
+
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
+import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
+import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinator;
+import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
 
 /**
  * ZooKeeper-based implementation of {@link org.apache.hadoop.hbase.CoordinatedStateManager}.
@@ -49,9 +56,21 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager {
   @Override
   public SplitLogWorkerCoordination getSplitLogWorkerCoordination() {
     return splitLogWorkerCoordination;
-    }
+  }
+
   @Override
   public SplitLogManagerCoordination getSplitLogManagerCoordination() {
     return splitLogManagerCoordination;
   }
+
+  @Override
+  public ProcedureCoordinatorRpcs getProcedureCoordinatorRpcs(String procType, String coordNode)
+      throws IOException {
+    return new ZKProcedureCoordinator(watcher, procType, coordNode);
+  }
+
+  @Override
+  public ProcedureMemberRpcs getProcedureMemberRpcs(String procType) throws KeeperException {
+    return new ZKProcedureMemberRpcs(watcher, procType);
+  }
 }