You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by el...@apache.org on 2017/08/23 16:47:29 UTC

[32/36] hbase git commit: HBASE-17614: Move Backup/Restore into separate module (Vladimir Rodionov)

http://git-wip-us.apache.org/repos/asf/hbase/blob/37c65946/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java
new file mode 100644
index 0000000..6330899
--- /dev/null
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java
@@ -0,0 +1,387 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable.WALItem;
+import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+
+/**
+ * After a full backup was created, the incremental backup will only store the changes made after
+ * the last full or incremental backup. Creating the backup copies the logfiles in .logs and
+ * .oldlogs since the last backup timestamp.
+ */
+@InterfaceAudience.Private
+public class IncrementalBackupManager extends BackupManager {
+  public static final Log LOG = LogFactory.getLog(IncrementalBackupManager.class);
+
+  public IncrementalBackupManager(Connection conn, Configuration conf) throws IOException {
+    super(conn, conf);
+  }
+
+  /**
+   * Obtain the list of logs that need to be copied out for this incremental backup. The list is set
+   * in BackupInfo.
+   * @return The new HashMap of RS log time stamps after the log roll for this incremental backup.
+   * @throws IOException exception
+   */
+  public HashMap<String, Long> getIncrBackupLogFileMap()
+      throws IOException {
+    List<String> logList;
+    HashMap<String, Long> newTimestamps;
+    HashMap<String, Long> previousTimestampMins;
+
+    String savedStartCode = readBackupStartCode();
+
+    // key: tableName
+    // value: <RegionServer,PreviousTimeStamp>
+    HashMap<TableName, HashMap<String, Long>> previousTimestampMap = readLogTimestampMap();
+
+    previousTimestampMins = BackupUtils.getRSLogTimestampMins(previousTimestampMap);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("StartCode " + savedStartCode + "for backupID " + backupInfo.getBackupId());
+    }
+    // get all new log files from .logs and .oldlogs after last TS and before new timestamp
+    if (savedStartCode == null || previousTimestampMins == null
+        || previousTimestampMins.isEmpty()) {
+      throw new IOException(
+          "Cannot read any previous back up timestamps from backup system table. "
+              + "In order to create an incremental backup, at least one full backup is needed.");
+    }
+
+    LOG.info("Execute roll log procedure for incremental backup ...");
+    HashMap<String, String> props = new HashMap<String, String>();
+    props.put("backupRoot", backupInfo.getBackupRootDir());
+
+    try (Admin admin = conn.getAdmin();) {
+
+      admin.execProcedure(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE,
+        LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props);
+
+    }
+    newTimestamps = readRegionServerLastLogRollResult();
+
+    logList = getLogFilesForNewBackup(previousTimestampMins, newTimestamps, conf, savedStartCode);
+    List<WALItem> logFromSystemTable =
+        getLogFilesFromBackupSystem(previousTimestampMins, newTimestamps, getBackupInfo()
+            .getBackupRootDir());
+    logList = excludeAlreadyBackedUpWALs(logList, logFromSystemTable);
+    backupInfo.setIncrBackupFileList(logList);
+
+    return newTimestamps;
+  }
+
+  /**
+   * Get list of WAL files eligible for incremental backup
+   * @return list of WAL files
+   * @throws IOException
+   */
+  public List<String> getIncrBackupLogFileList()
+      throws IOException {
+    List<String> logList;
+    HashMap<String, Long> newTimestamps;
+    HashMap<String, Long> previousTimestampMins;
+
+    String savedStartCode = readBackupStartCode();
+
+    // key: tableName
+    // value: <RegionServer,PreviousTimeStamp>
+    HashMap<TableName, HashMap<String, Long>> previousTimestampMap = readLogTimestampMap();
+
+    previousTimestampMins = BackupUtils.getRSLogTimestampMins(previousTimestampMap);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("StartCode " + savedStartCode + "for backupID " + backupInfo.getBackupId());
+    }
+    // get all new log files from .logs and .oldlogs after last TS and before new timestamp
+    if (savedStartCode == null || previousTimestampMins == null
+        || previousTimestampMins.isEmpty()) {
+      throw new IOException(
+          "Cannot read any previous back up timestamps from backup system table. "
+              + "In order to create an incremental backup, at least one full backup is needed.");
+    }
+
+    newTimestamps = readRegionServerLastLogRollResult();
+
+    logList = getLogFilesForNewBackup(previousTimestampMins, newTimestamps, conf, savedStartCode);
+    List<WALItem> logFromSystemTable =
+        getLogFilesFromBackupSystem(previousTimestampMins, newTimestamps, getBackupInfo()
+            .getBackupRootDir());
+
+    logList = excludeAlreadyBackedUpWALs(logList, logFromSystemTable);
+    backupInfo.setIncrBackupFileList(logList);
+
+    return logList;
+  }
+
+
+  private List<String> excludeAlreadyBackedUpWALs(List<String> logList,
+      List<WALItem> logFromSystemTable) {
+
+    Set<String> walFileNameSet = convertToSet(logFromSystemTable);
+
+    List<String> list = new ArrayList<String>();
+    for (int i=0; i < logList.size(); i++) {
+      Path p = new Path(logList.get(i));
+      String name  = p.getName();
+      if (walFileNameSet.contains(name)) continue;
+      list.add(logList.get(i));
+    }
+    return list;
+  }
+
+  /**
+   * Create Set of WAL file names (not full path names)
+   * @param logFromSystemTable
+   * @return set of WAL file names
+   */
+  private Set<String> convertToSet(List<WALItem> logFromSystemTable) {
+
+    Set<String> set = new HashSet<String>();
+    for (int i=0; i < logFromSystemTable.size(); i++) {
+      WALItem item = logFromSystemTable.get(i);
+      set.add(item.walFile);
+    }
+    return set;
+  }
+
+  /**
+   * For each region server: get all log files newer than the last timestamps, but not newer than
+   * the newest timestamps.
+   * @param olderTimestamps timestamp map for each region server of the last backup.
+   * @param newestTimestamps timestamp map for each region server that the backup should lead to.
+   * @return list of log files which needs to be added to this backup
+   * @throws IOException
+   */
+  private List<WALItem> getLogFilesFromBackupSystem(HashMap<String, Long> olderTimestamps,
+      HashMap<String, Long> newestTimestamps, String backupRoot) throws IOException {
+    List<WALItem> logFiles = new ArrayList<WALItem>();
+    Iterator<WALItem> it = getWALFilesFromBackupSystem();
+    while (it.hasNext()) {
+      WALItem item = it.next();
+      String rootDir = item.getBackupRoot();
+      if (!rootDir.equals(backupRoot)) {
+        continue;
+      }
+      String walFileName = item.getWalFile();
+      String server = BackupUtils.parseHostNameFromLogFile(new Path(walFileName));
+      if (server == null) {
+        continue;
+      }
+      Long tss = getTimestamp(walFileName);
+      Long oldTss = olderTimestamps.get(server);
+      Long newTss = newestTimestamps.get(server);
+      if (oldTss == null) {
+        logFiles.add(item);
+        continue;
+      }
+      if (newTss == null) {
+        newTss = Long.MAX_VALUE;
+      }
+      if (tss > oldTss && tss < newTss) {
+        logFiles.add(item);
+      }
+    }
+    return logFiles;
+  }
+
+  private Long getTimestamp(String walFileName) {
+    int index = walFileName.lastIndexOf(BackupUtils.LOGNAME_SEPARATOR);
+    return Long.parseLong(walFileName.substring(index + 1));
+  }
+
+  /**
+   * For each region server: get all log files newer than the last timestamps but not newer than the
+   * newest timestamps.
+   * @param olderTimestamps the timestamp for each region server of the last backup.
+   * @param newestTimestamps the timestamp for each region server that the backup should lead to.
+   * @param conf the Hadoop and Hbase configuration
+   * @param savedStartCode the startcode (timestamp) of last successful backup.
+   * @return a list of log files to be backed up
+   * @throws IOException exception
+   */
+  private List<String> getLogFilesForNewBackup(HashMap<String, Long> olderTimestamps,
+      HashMap<String, Long> newestTimestamps, Configuration conf, String savedStartCode)
+      throws IOException {
+    LOG.debug("In getLogFilesForNewBackup()\n" + "olderTimestamps: " + olderTimestamps
+        + "\n newestTimestamps: " + newestTimestamps);
+    Path rootdir = FSUtils.getRootDir(conf);
+    Path logDir = new Path(rootdir, HConstants.HREGION_LOGDIR_NAME);
+    Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
+    FileSystem fs = rootdir.getFileSystem(conf);
+    NewestLogFilter pathFilter = new NewestLogFilter();
+
+    List<String> resultLogFiles = new ArrayList<String>();
+    List<String> newestLogs = new ArrayList<String>();
+
+    /*
+     * The old region servers and timestamps info we kept in backup system table may be out of sync
+     * if new region server is added or existing one lost. We'll deal with it here when processing
+     * the logs. If data in backup system table has more hosts, just ignore it. If the .logs
+     * directory includes more hosts, the additional hosts will not have old timestamps to compare
+     * with. We'll just use all the logs in that directory. We always write up-to-date region server
+     * and timestamp info to backup system table at the end of successful backup.
+     */
+
+    FileStatus[] rss;
+    Path p;
+    String host;
+    Long oldTimeStamp;
+    String currentLogFile;
+    long currentLogTS;
+
+    // Get the files in .logs.
+    rss = fs.listStatus(logDir);
+    for (FileStatus rs : rss) {
+      p = rs.getPath();
+      host = BackupUtils.parseHostNameFromLogFile(p);
+      if (host == null) {
+        continue;
+      }
+      FileStatus[] logs;
+      oldTimeStamp = olderTimestamps.get(host);
+      // It is possible that there is no old timestamp in backup system table for this host if
+      // this region server is newly added after our last backup.
+      if (oldTimeStamp == null) {
+        logs = fs.listStatus(p);
+      } else {
+        pathFilter.setLastBackupTS(oldTimeStamp);
+        logs = fs.listStatus(p, pathFilter);
+      }
+      for (FileStatus log : logs) {
+        LOG.debug("currentLogFile: " + log.getPath().toString());
+        if (AbstractFSWALProvider.isMetaFile(log.getPath())) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Skip hbase:meta log file: " + log.getPath().getName());
+          }
+          continue;
+        }
+        currentLogFile = log.getPath().toString();
+        resultLogFiles.add(currentLogFile);
+        currentLogTS = BackupUtils.getCreationTime(log.getPath());
+        // newestTimestamps is up-to-date with the current list of hosts
+        // so newestTimestamps.get(host) will not be null.
+        if (currentLogTS > newestTimestamps.get(host)) {
+          newestLogs.add(currentLogFile);
+        }
+      }
+    }
+
+    // Include the .oldlogs files too.
+    FileStatus[] oldlogs = fs.listStatus(oldLogDir);
+    for (FileStatus oldlog : oldlogs) {
+      p = oldlog.getPath();
+      currentLogFile = p.toString();
+      if (AbstractFSWALProvider.isMetaFile(p)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Skip .meta log file: " + currentLogFile);
+        }
+        continue;
+      }
+      host = BackupUtils.parseHostFromOldLog(p);
+      if (host == null) {
+        continue;
+      }
+      currentLogTS = BackupUtils.getCreationTime(p);
+      oldTimeStamp = olderTimestamps.get(host);
+      /*
+       * It is possible that there is no old timestamp in backup system table for this host. At the
+       * time of our last backup operation, this rs did not exist. The reason can be one of the two:
+       * 1. The rs already left/crashed. Its logs were moved to .oldlogs. 2. The rs was added after
+       * our last backup.
+       */
+      if (oldTimeStamp == null) {
+        if (currentLogTS < Long.parseLong(savedStartCode)) {
+          // This log file is really old, its region server was before our last backup.
+          continue;
+        } else {
+          resultLogFiles.add(currentLogFile);
+        }
+      } else if (currentLogTS > oldTimeStamp) {
+        resultLogFiles.add(currentLogFile);
+      }
+
+      // It is possible that a host in .oldlogs is an obsolete region server
+      // so newestTimestamps.get(host) here can be null.
+      // Even if these logs belong to a obsolete region server, we still need
+      // to include they to avoid loss of edits for backup.
+      Long newTimestamp = newestTimestamps.get(host);
+      if (newTimestamp != null && currentLogTS > newTimestamp) {
+        newestLogs.add(currentLogFile);
+      }
+    }
+    // remove newest log per host because they are still in use
+    resultLogFiles.removeAll(newestLogs);
+    return resultLogFiles;
+  }
+
+  static class NewestLogFilter implements PathFilter {
+    private Long lastBackupTS = 0L;
+
+    public NewestLogFilter() {
+    }
+
+    protected void setLastBackupTS(Long ts) {
+      this.lastBackupTS = ts;
+    }
+
+    @Override
+    public boolean accept(Path path) {
+      // skip meta table log -- ts.meta file
+      if (AbstractFSWALProvider.isMetaFile(path)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Skip .meta log file: " + path.getName());
+        }
+        return false;
+      }
+      long timestamp;
+      try {
+        timestamp = BackupUtils.getCreationTime(path);
+        return timestamp > lastBackupTS;
+      } catch (Exception e) {
+        LOG.warn("Cannot read timestamp of log file " + path);
+        return false;
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/37c65946/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
new file mode 100644
index 0000000..6d48c32
--- /dev/null
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
@@ -0,0 +1,377 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.impl;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupCopyJob;
+import org.apache.hadoop.hbase.backup.BackupInfo;
+import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase;
+import org.apache.hadoop.hbase.backup.BackupRequest;
+import org.apache.hadoop.hbase.backup.BackupRestoreFactory;
+import org.apache.hadoop.hbase.backup.BackupType;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.mapreduce.WALPlayer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.HFileArchiveUtil;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.util.Tool;
+
+/**
+ * Incremental backup implementation.
+ * See the {@link #execute() execute} method.
+ *
+ */
+@InterfaceAudience.Private
+public class IncrementalTableBackupClient extends TableBackupClient {
+  private static final Log LOG = LogFactory.getLog(IncrementalTableBackupClient.class);
+
+  protected IncrementalTableBackupClient() {
+  }
+
+  public IncrementalTableBackupClient(final Connection conn, final String backupId,
+      BackupRequest request) throws IOException {
+    super(conn, backupId, request);
+  }
+
+  protected List<String> filterMissingFiles(List<String> incrBackupFileList) throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    List<String> list = new ArrayList<String>();
+    for (String file : incrBackupFileList) {
+      Path p = new Path(file);
+      if (fs.exists(p) || isActiveWalPath(p)) {
+        list.add(file);
+      } else {
+        LOG.warn("Can't find file: " + file);
+      }
+    }
+    return list;
+  }
+
+  /**
+   * Check if a given path is belongs to active WAL directory
+   * @param p path
+   * @return true, if yes
+   */
+  protected boolean isActiveWalPath(Path p) {
+    return !AbstractFSWALProvider.isArchivedLogFile(p);
+  }
+
+  protected static int getIndex(TableName tbl, List<TableName> sTableList) {
+    if (sTableList == null) return 0;
+    for (int i = 0; i < sTableList.size(); i++) {
+      if (tbl.equals(sTableList.get(i))) {
+        return i;
+      }
+    }
+    return -1;
+  }
+
+  /*
+   * Reads bulk load records from backup table, iterates through the records and forms the paths
+   * for bulk loaded hfiles. Copies the bulk loaded hfiles to backup destination
+   * @param sTableList list of tables to be backed up
+   * @return map of table to List of files
+   */
+  protected Map<byte[], List<Path>>[] handleBulkLoad(List<TableName> sTableList) throws IOException {
+    Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList.size()];
+    Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>> pair =
+    backupManager.readBulkloadRows(sTableList);
+    Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = pair.getFirst();
+    FileSystem fs = FileSystem.get(conf);
+    FileSystem tgtFs;
+    try {
+      tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf);
+    } catch (URISyntaxException use) {
+      throw new IOException("Unable to get FileSystem", use);
+    }
+    Path rootdir = FSUtils.getRootDir(conf);
+    Path tgtRoot = new Path(new Path(backupInfo.getBackupRootDir()), backupId);
+    for (Map.Entry<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> tblEntry :
+      map.entrySet()) {
+      TableName srcTable = tblEntry.getKey();
+      int srcIdx = getIndex(srcTable, sTableList);
+      if (srcIdx < 0) {
+        LOG.warn("Couldn't find " + srcTable + " in source table List");
+        continue;
+      }
+      if (mapForSrc[srcIdx] == null) {
+        mapForSrc[srcIdx] = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
+      }
+      Path tblDir = FSUtils.getTableDir(rootdir, srcTable);
+      Path tgtTable = new Path(new Path(tgtRoot, srcTable.getNamespaceAsString()),
+          srcTable.getQualifierAsString());
+      for (Map.Entry<String,Map<String,List<Pair<String, Boolean>>>> regionEntry :
+        tblEntry.getValue().entrySet()){
+        String regionName = regionEntry.getKey();
+        Path regionDir = new Path(tblDir, regionName);
+        // map from family to List of hfiles
+        for (Map.Entry<String,List<Pair<String, Boolean>>> famEntry :
+          regionEntry.getValue().entrySet()) {
+          String fam = famEntry.getKey();
+          Path famDir = new Path(regionDir, fam);
+          List<Path> files;
+          if (!mapForSrc[srcIdx].containsKey(fam.getBytes())) {
+            files = new ArrayList<Path>();
+            mapForSrc[srcIdx].put(fam.getBytes(), files);
+          } else {
+            files = mapForSrc[srcIdx].get(fam.getBytes());
+          }
+          Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, regionName, fam);
+          String tblName = srcTable.getQualifierAsString();
+          Path tgtFam = new Path(new Path(tgtTable, regionName), fam);
+          if (!tgtFs.mkdirs(tgtFam)) {
+            throw new IOException("couldn't create " + tgtFam);
+          }
+          for (Pair<String, Boolean> fileWithState : famEntry.getValue()) {
+            String file = fileWithState.getFirst();
+            boolean raw = fileWithState.getSecond();
+            int idx = file.lastIndexOf("/");
+            String filename = file;
+            if (idx > 0) {
+              filename = file.substring(idx+1);
+            }
+            Path p = new Path(famDir, filename);
+            Path tgt = new Path(tgtFam, filename);
+            Path archive = new Path(archiveDir, filename);
+            if (fs.exists(p)) {
+              if (LOG.isTraceEnabled()) {
+                LOG.trace("found bulk hfile " + file + " in " + famDir + " for " + tblName);
+              }
+              try {
+                if (LOG.isTraceEnabled()) {
+                  LOG.trace("copying " + p + " to " + tgt);
+                }
+                FileUtil.copy(fs, p, tgtFs, tgt, false,conf);
+              } catch (FileNotFoundException e) {
+                LOG.debug("copying archive " + archive + " to " + tgt);
+                try {
+                  FileUtil.copy(fs, archive, tgtFs, tgt, false, conf);
+                } catch (FileNotFoundException fnfe) {
+                  if (!raw) throw fnfe;
+                }
+              }
+            } else {
+              LOG.debug("copying archive " + archive + " to " + tgt);
+              try {
+                FileUtil.copy(fs, archive, tgtFs, tgt, false, conf);
+              } catch (FileNotFoundException fnfe) {
+                if (!raw) throw fnfe;
+              }
+            }
+            files.add(tgt);
+          }
+        }
+      }
+    }
+    backupManager.writeBulkLoadedFiles(sTableList, mapForSrc);
+    backupManager.removeBulkLoadedRows(sTableList, pair.getSecond());
+    return mapForSrc;
+  }
+
+  @Override
+  public void execute() throws IOException {
+
+    try {
+      // case PREPARE_INCREMENTAL:
+      beginBackup(backupManager, backupInfo);
+      backupInfo.setPhase(BackupPhase.PREPARE_INCREMENTAL);
+      LOG.debug("For incremental backup, current table set is "
+          + backupManager.getIncrementalBackupTableSet());
+      newTimestamps =
+          ((IncrementalBackupManager) backupManager).getIncrBackupLogFileMap();
+    } catch (Exception e) {
+      // fail the overall backup and return
+      failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ",
+        BackupType.INCREMENTAL, conf);
+      return;
+    }
+
+    // case INCREMENTAL_COPY:
+    try {
+      // copy out the table and region info files for each table
+      BackupUtils.copyTableRegionInfo(conn, backupInfo, conf);
+      // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT
+      convertWALsToHFiles(backupInfo);
+      incrementalCopyHFiles(backupInfo);
+      // Save list of WAL files copied
+      backupManager.recordWALFiles(backupInfo.getIncrBackupFileList());
+    } catch (Exception e) {
+      String msg = "Unexpected exception in incremental-backup: incremental copy " + backupId;
+      // fail the overall backup and return
+      failBackup(conn, backupInfo, backupManager, e, msg, BackupType.INCREMENTAL, conf);
+      return;
+    }
+    // case INCR_BACKUP_COMPLETE:
+    // set overall backup status: complete. Here we make sure to complete the backup.
+    // After this checkpoint, even if entering cancel process, will let the backup finished
+    try {
+      // Set the previousTimestampMap which is before this current log roll to the manifest.
+      HashMap<TableName, HashMap<String, Long>> previousTimestampMap =
+          backupManager.readLogTimestampMap();
+      backupInfo.setIncrTimestampMap(previousTimestampMap);
+
+      // The table list in backupInfo is good for both full backup and incremental backup.
+      // For incremental backup, it contains the incremental backup table set.
+      backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps);
+
+      HashMap<TableName, HashMap<String, Long>> newTableSetTimestampMap =
+          backupManager.readLogTimestampMap();
+
+      Long newStartCode =
+          BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap));
+      backupManager.writeBackupStartCode(newStartCode);
+
+      handleBulkLoad(backupInfo.getTableNames());
+      // backup complete
+      completeBackup(conn, backupInfo, backupManager, BackupType.INCREMENTAL, conf);
+
+    } catch (IOException e) {
+      failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ",
+        BackupType.INCREMENTAL, conf);
+    }
+  }
+
+  protected void incrementalCopyHFiles(BackupInfo backupInfo) throws Exception {
+
+    try {
+      LOG.debug("Incremental copy HFiles is starting.");
+      // set overall backup phase: incremental_copy
+      backupInfo.setPhase(BackupPhase.INCREMENTAL_COPY);
+      // get incremental backup file list and prepare parms for DistCp
+      List<String> incrBackupFileList = new ArrayList<String>();
+      // Add Bulk output
+      incrBackupFileList.add(getBulkOutputDir().toString());
+      String[] strArr = incrBackupFileList.toArray(new String[incrBackupFileList.size() + 1]);
+      strArr[strArr.length - 1] = backupInfo.getBackupRootDir();
+      BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf);
+      int res = copyService.copy(backupInfo, backupManager, conf, BackupType.INCREMENTAL, strArr);
+      if (res != 0) {
+        LOG.error("Copy incremental HFile files failed with return code: " + res + ".");
+        throw new IOException("Failed copy from " + StringUtils.join(incrBackupFileList, ',')
+            + " to " + backupInfo.getHLogTargetDir());
+      }
+      LOG.debug("Incremental copy HFiles from " + StringUtils.join(incrBackupFileList, ',')
+          + " to " + backupInfo.getBackupRootDir() + " finished.");
+    } finally {
+      deleteBulkLoadDirectory();
+    }
+  }
+
+  protected void deleteBulkLoadDirectory() throws IOException {
+    // delete original bulk load directory on method exit
+    Path path = getBulkOutputDir();
+    FileSystem fs = FileSystem.get(conf);
+    boolean result = fs.delete(path, true);
+    if (!result) {
+      LOG.warn("Could not delete " + path);
+    }
+
+  }
+
+  protected void convertWALsToHFiles(BackupInfo backupInfo) throws IOException {
+    // get incremental backup file list and prepare parameters for DistCp
+    List<String> incrBackupFileList = backupInfo.getIncrBackupFileList();
+    // Get list of tables in incremental backup set
+    Set<TableName> tableSet = backupManager.getIncrementalBackupTableSet();
+    // filter missing files out (they have been copied by previous backups)
+    incrBackupFileList = filterMissingFiles(incrBackupFileList);
+    for (TableName table : tableSet) {
+      // Check if table exists
+      if (tableExists(table, conn)) {
+        walToHFiles(incrBackupFileList, table);
+      } else {
+        LOG.warn("Table " + table + " does not exists. Skipping in WAL converter");
+      }
+    }
+  }
+
+
+  protected boolean tableExists(TableName table, Connection conn) throws IOException {
+    try (Admin admin = conn.getAdmin();) {
+      return admin.tableExists(table);
+    }
+  }
+
+  protected void walToHFiles(List<String> dirPaths, TableName tableName) throws IOException {
+
+    Tool player = new WALPlayer();
+
+    // Player reads all files in arbitrary directory structure and creates
+    // a Map task for each file. We use ';' as separator
+    // because WAL file names contains ','
+    String dirs = StringUtils.join(dirPaths, ';');
+
+    Path bulkOutputPath = getBulkOutputDirForTable(tableName);
+    conf.set(WALPlayer.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
+    conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";");
+    String[] playerArgs = { dirs, tableName.getNameAsString() };
+
+    try {
+      player.setConf(conf);
+      int result = player.run(playerArgs);
+      if(result != 0) {
+        throw new IOException("WAL Player failed");
+      }
+      conf.unset(WALPlayer.INPUT_FILES_SEPARATOR_KEY);
+    } catch (IOException e) {
+      throw e;
+    } catch (Exception ee) {
+      throw new IOException("Can not convert from directory " + dirs
+          + " (check Hadoop, HBase and WALPlayer M/R job logs) ", ee);
+    }
+  }
+
+  protected Path getBulkOutputDirForTable(TableName table) {
+    Path tablePath = getBulkOutputDir();
+    tablePath = new Path(tablePath, table.getNamespaceAsString());
+    tablePath = new Path(tablePath, table.getQualifierAsString());
+    return new Path(tablePath, "data");
+  }
+
+  protected Path getBulkOutputDir() {
+    String backupId = backupInfo.getBackupId();
+    Path path = new Path(backupInfo.getBackupRootDir());
+    path = new Path(path, ".tmp");
+    path = new Path(path, backupId);
+    return path;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/37c65946/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
new file mode 100644
index 0000000..ea7a7b8
--- /dev/null
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
@@ -0,0 +1,278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.impl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupType;
+import org.apache.hadoop.hbase.backup.HBackupFileSystem;
+import org.apache.hadoop.hbase.backup.RestoreRequest;
+import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.backup.util.RestoreTool;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.LoadQueueItem;
+
+/**
+ * Restore table implementation
+ *
+ */
+@InterfaceAudience.Private
+public class RestoreTablesClient {
+  private static final Log LOG = LogFactory.getLog(RestoreTablesClient.class);
+
+  private Configuration conf;
+  private Connection conn;
+  private String backupId;
+  private TableName[] sTableArray;
+  private TableName[] tTableArray;
+  private String targetRootDir;
+  private boolean isOverwrite;
+
+  public RestoreTablesClient(Connection conn, RestoreRequest request) throws IOException {
+    this.targetRootDir = request.getBackupRootDir();
+    this.backupId = request.getBackupId();
+    this.sTableArray = request.getFromTables();
+    this.tTableArray = request.getToTables();
+    if (tTableArray == null || tTableArray.length == 0) {
+      this.tTableArray = sTableArray;
+    }
+    this.isOverwrite = request.isOverwrite();
+    this.conn = conn;
+    this.conf = conn.getConfiguration();
+
+  }
+
+  /**
+   * Validate target tables
+   * @param conn connection
+   * @param mgr table state manager
+   * @param tTableArray: target tables
+   * @param isOverwrite overwrite existing table
+   * @throws IOException exception
+   */
+  private void checkTargetTables(TableName[] tTableArray, boolean isOverwrite) throws IOException {
+    ArrayList<TableName> existTableList = new ArrayList<>();
+    ArrayList<TableName> disabledTableList = new ArrayList<>();
+
+    // check if the tables already exist
+    try (Admin admin = conn.getAdmin();) {
+      for (TableName tableName : tTableArray) {
+        if (admin.tableExists(tableName)) {
+          existTableList.add(tableName);
+          if (admin.isTableDisabled(tableName)) {
+            disabledTableList.add(tableName);
+          }
+        } else {
+          LOG.info("HBase table " + tableName
+              + " does not exist. It will be created during restore process");
+        }
+      }
+    }
+
+    if (existTableList.size() > 0) {
+      if (!isOverwrite) {
+        LOG.error("Existing table (" + existTableList
+            + ") found in the restore target, please add "
+            + "\"-overwrite\" option in the command if you mean"
+            + " to restore to these existing tables");
+        throw new IOException("Existing table found in target while no \"-overwrite\" "
+            + "option found");
+      } else {
+        if (disabledTableList.size() > 0) {
+          LOG.error("Found offline table in the restore target, "
+              + "please enable them before restore with \"-overwrite\" option");
+          LOG.info("Offline table list in restore target: " + disabledTableList);
+          throw new IOException(
+              "Found offline table in the target when restore with \"-overwrite\" option");
+        }
+      }
+    }
+  }
+
+  /**
+   * Restore operation handle each backupImage in array
+   * @param svc: master services
+   * @param images: array BackupImage
+   * @param sTable: table to be restored
+   * @param tTable: table to be restored to
+   * @param truncateIfExists: truncate table
+   * @throws IOException exception
+   */
+
+  private void restoreImages(BackupImage[] images, TableName sTable, TableName tTable,
+      boolean truncateIfExists) throws IOException {
+
+    // First image MUST be image of a FULL backup
+    BackupImage image = images[0];
+    String rootDir = image.getRootDir();
+    String backupId = image.getBackupId();
+    Path backupRoot = new Path(rootDir);
+    RestoreTool restoreTool = new RestoreTool(conf, backupRoot, backupId);
+    Path tableBackupPath = HBackupFileSystem.getTableBackupPath(sTable, backupRoot, backupId);
+    String lastIncrBackupId = images.length == 1 ? null : images[images.length - 1].getBackupId();
+    // We need hFS only for full restore (see the code)
+    BackupManifest manifest = HBackupFileSystem.getManifest(conf, backupRoot, backupId);
+    if (manifest.getType() == BackupType.FULL) {
+      LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from full" + " backup image "
+          + tableBackupPath.toString());
+      restoreTool.fullRestoreTable(conn, tableBackupPath, sTable, tTable, truncateIfExists,
+        lastIncrBackupId);
+    } else { // incremental Backup
+      throw new IOException("Unexpected backup type " + image.getType());
+    }
+
+    if (images.length == 1) {
+      // full backup restore done
+      return;
+    }
+
+    List<Path> dirList = new ArrayList<Path>();
+    // add full backup path
+    // full backup path comes first
+    for (int i = 1; i < images.length; i++) {
+      BackupImage im = images[i];
+      String fileBackupDir =
+          HBackupFileSystem.getTableBackupDataDir(im.getRootDir(), im.getBackupId(), sTable);
+      dirList.add(new Path(fileBackupDir));
+    }
+
+    String dirs = StringUtils.join(dirList, ",");
+    LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from log dirs: " + dirs);
+    Path[] paths = new Path[dirList.size()];
+    dirList.toArray(paths);
+    restoreTool.incrementalRestoreTable(conn, tableBackupPath, paths, new TableName[] { sTable },
+      new TableName[] { tTable }, lastIncrBackupId);
+    LOG.info(sTable + " has been successfully restored to " + tTable);
+  }
+
+  /**
+   * Restore operation. Stage 2: resolved Backup Image dependency
+   * @param backupManifestMap : tableName, Manifest
+   * @param sTableArray The array of tables to be restored
+   * @param tTableArray The array of mapping tables to restore to
+   * @return set of BackupImages restored
+   * @throws IOException exception
+   */
+  private void restore(HashMap<TableName, BackupManifest> backupManifestMap,
+      TableName[] sTableArray, TableName[] tTableArray, boolean isOverwrite) throws IOException {
+    TreeSet<BackupImage> restoreImageSet = new TreeSet<BackupImage>();
+    boolean truncateIfExists = isOverwrite;
+    Set<String> backupIdSet = new HashSet<>();
+
+    for (int i = 0; i < sTableArray.length; i++) {
+      TableName table = sTableArray[i];
+
+      BackupManifest manifest = backupManifestMap.get(table);
+      // Get the image list of this backup for restore in time order from old
+      // to new.
+      List<BackupImage> list = new ArrayList<BackupImage>();
+      list.add(manifest.getBackupImage());
+      TreeSet<BackupImage> set = new TreeSet<BackupImage>(list);
+      List<BackupImage> depList = manifest.getDependentListByTable(table);
+      set.addAll(depList);
+      BackupImage[] arr = new BackupImage[set.size()];
+      set.toArray(arr);
+      restoreImages(arr, table, tTableArray[i], truncateIfExists);
+      restoreImageSet.addAll(list);
+      if (restoreImageSet != null && !restoreImageSet.isEmpty()) {
+        LOG.info("Restore includes the following image(s):");
+        for (BackupImage image : restoreImageSet) {
+          LOG.info("Backup: " + image.getBackupId() + " "
+              + HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(), table));
+          if (image.getType() == BackupType.INCREMENTAL) {
+            backupIdSet.add(image.getBackupId());
+            LOG.debug("adding " + image.getBackupId() + " for bulk load");
+          }
+        }
+      }
+    }
+    try (BackupSystemTable table = new BackupSystemTable(conn)) {
+      List<TableName> sTableList = Arrays.asList(sTableArray);
+      for (String id : backupIdSet) {
+        LOG.debug("restoring bulk load for " + id);
+        Map<byte[], List<Path>>[] mapForSrc = table.readBulkLoadedFiles(id, sTableList);
+        Map<LoadQueueItem, ByteBuffer> loaderResult;
+        conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true);
+        LoadIncrementalHFiles loader = BackupUtils.createLoader(conf);
+        for (int i = 0; i < sTableList.size(); i++) {
+          if (mapForSrc[i] != null && !mapForSrc[i].isEmpty()) {
+            loaderResult = loader.run(null, mapForSrc[i], tTableArray[i]);
+            LOG.debug("bulk loading " + sTableList.get(i) + " to " + tTableArray[i]);
+            if (loaderResult.isEmpty()) {
+              String msg = "Couldn't bulk load for " + sTableList.get(i) + " to " + tTableArray[i];
+              LOG.error(msg);
+              throw new IOException(msg);
+            }
+          }
+        }
+      }
+    }
+    LOG.debug("restoreStage finished");
+  }
+
+  static long getTsFromBackupId(String backupId) {
+    if (backupId == null) {
+      return 0;
+    }
+    return Long.parseLong(backupId.substring(backupId.lastIndexOf("_") + 1));
+  }
+
+  static boolean withinRange(long a, long lower, long upper) {
+    if (a < lower || a > upper) {
+      return false;
+    }
+    return true;
+  }
+
+  public void execute() throws IOException {
+
+    // case VALIDATION:
+    // check the target tables
+    checkTargetTables(tTableArray, isOverwrite);
+
+    // case RESTORE_IMAGES:
+    HashMap<TableName, BackupManifest> backupManifestMap = new HashMap<>();
+    // check and load backup image manifest for the tables
+    Path rootPath = new Path(targetRootDir);
+    HBackupFileSystem.checkImageManifestExist(backupManifestMap, sTableArray, conf, rootPath,
+      backupId);
+
+    restore(backupManifestMap, sTableArray, tTableArray, isOverwrite);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/37c65946/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java
new file mode 100644
index 0000000..6eec460
--- /dev/null
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java
@@ -0,0 +1,436 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupInfo;
+import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase;
+import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
+import org.apache.hadoop.hbase.backup.BackupRequest;
+import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
+import org.apache.hadoop.hbase.backup.BackupType;
+import org.apache.hadoop.hbase.backup.HBackupFileSystem;
+import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Base class for backup operation. Concrete implementation for
+ * full and incremental backup are delegated to corresponding sub-classes:
+ * {@link FullTableBackupClient} and {@link IncrementalTableBackupClient}
+ *
+ */
+@InterfaceAudience.Private
+public abstract class TableBackupClient {
+
+  public static final String BACKUP_CLIENT_IMPL_CLASS = "backup.client.impl.class";
+
+  @VisibleForTesting
+  public static final String BACKUP_TEST_MODE_STAGE = "backup.test.mode.stage";
+
+  private static final Log LOG = LogFactory.getLog(TableBackupClient.class);
+
+  protected Configuration conf;
+  protected Connection conn;
+  protected String backupId;
+  protected List<TableName> tableList;
+  protected HashMap<String, Long> newTimestamps = null;
+
+  protected BackupManager backupManager;
+  protected BackupInfo backupInfo;
+
+  public TableBackupClient() {
+  }
+
+  public TableBackupClient(final Connection conn, final String backupId, BackupRequest request)
+      throws IOException {
+    init(conn, backupId, request);
+  }
+
+  public void init(final Connection conn, final String backupId, BackupRequest request)
+      throws IOException
+  {
+    if (request.getBackupType() == BackupType.FULL) {
+      backupManager = new BackupManager(conn, conn.getConfiguration());
+    } else {
+      backupManager = new IncrementalBackupManager(conn, conn.getConfiguration());
+    }
+    this.backupId = backupId;
+    this.tableList = request.getTableList();
+    this.conn = conn;
+    this.conf = conn.getConfiguration();
+    backupInfo =
+        backupManager.createBackupInfo(backupId, request.getBackupType(), tableList,
+          request.getTargetRootDir(), request.getTotalTasks(), request.getBandwidth());
+    if (tableList == null || tableList.isEmpty()) {
+      this.tableList = new ArrayList<>(backupInfo.getTables());
+    }
+    // Start new session
+    backupManager.startBackupSession();
+  }
+
+  /**
+   * Begin the overall backup.
+   * @param backupInfo backup info
+   * @throws IOException exception
+   */
+  protected void beginBackup(BackupManager backupManager, BackupInfo backupInfo)
+      throws IOException {
+
+    BackupSystemTable.snapshot(conn);
+    backupManager.setBackupInfo(backupInfo);
+    // set the start timestamp of the overall backup
+    long startTs = EnvironmentEdgeManager.currentTime();
+    backupInfo.setStartTs(startTs);
+    // set overall backup status: ongoing
+    backupInfo.setState(BackupState.RUNNING);
+    backupInfo.setPhase(BackupPhase.REQUEST);
+    LOG.info("Backup " + backupInfo.getBackupId() + " started at " + startTs + ".");
+
+    backupManager.updateBackupInfo(backupInfo);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Backup session " + backupInfo.getBackupId() + " has been started.");
+    }
+  }
+
+  protected String getMessage(Exception e) {
+    String msg = e.getMessage();
+    if (msg == null || msg.equals("")) {
+      msg = e.getClass().getName();
+    }
+    return msg;
+  }
+
+  /**
+   * Delete HBase snapshot for backup.
+   * @param backupInfo backup info
+   * @throws Exception exception
+   */
+  protected static void deleteSnapshots(final Connection conn, BackupInfo backupInfo, Configuration conf)
+      throws IOException {
+    LOG.debug("Trying to delete snapshot for full backup.");
+    for (String snapshotName : backupInfo.getSnapshotNames()) {
+      if (snapshotName == null) {
+        continue;
+      }
+      LOG.debug("Trying to delete snapshot: " + snapshotName);
+
+      try (Admin admin = conn.getAdmin();) {
+        admin.deleteSnapshot(snapshotName);
+      }
+      LOG.debug("Deleting the snapshot " + snapshotName + " for backup " + backupInfo.getBackupId()
+          + " succeeded.");
+    }
+  }
+
+  /**
+   * Clean up directories with prefix "exportSnapshot-", which are generated when exporting
+   * snapshots.
+   * @throws IOException exception
+   */
+  protected static void cleanupExportSnapshotLog(Configuration conf) throws IOException {
+    FileSystem fs = FSUtils.getCurrentFileSystem(conf);
+    Path stagingDir =
+        new Path(conf.get(BackupRestoreConstants.CONF_STAGING_ROOT, fs.getWorkingDirectory()
+            .toString()));
+    FileStatus[] files = FSUtils.listStatus(fs, stagingDir);
+    if (files == null) {
+      return;
+    }
+    for (FileStatus file : files) {
+      if (file.getPath().getName().startsWith("exportSnapshot-")) {
+        LOG.debug("Delete log files of exporting snapshot: " + file.getPath().getName());
+        if (FSUtils.delete(fs, file.getPath(), true) == false) {
+          LOG.warn("Can not delete " + file.getPath());
+        }
+      }
+    }
+  }
+
+  /**
+   * Clean up the uncompleted data at target directory if the ongoing backup has already entered
+   * the copy phase.
+   */
+  protected static void cleanupTargetDir(BackupInfo backupInfo, Configuration conf) {
+    try {
+      // clean up the uncompleted data at target directory if the ongoing backup has already entered
+      // the copy phase
+      LOG.debug("Trying to cleanup up target dir. Current backup phase: "
+          + backupInfo.getPhase());
+      if (backupInfo.getPhase().equals(BackupPhase.SNAPSHOTCOPY)
+          || backupInfo.getPhase().equals(BackupPhase.INCREMENTAL_COPY)
+          || backupInfo.getPhase().equals(BackupPhase.STORE_MANIFEST)) {
+        FileSystem outputFs =
+            FileSystem.get(new Path(backupInfo.getBackupRootDir()).toUri(), conf);
+
+        // now treat one backup as a transaction, clean up data that has been partially copied at
+        // table level
+        for (TableName table : backupInfo.getTables()) {
+          Path targetDirPath =
+              new Path(HBackupFileSystem.getTableBackupDir(backupInfo.getBackupRootDir(),
+                backupInfo.getBackupId(), table));
+          if (outputFs.delete(targetDirPath, true)) {
+            LOG.debug("Cleaning up uncompleted backup data at " + targetDirPath.toString()
+                + " done.");
+          } else {
+            LOG.debug("No data has been copied to " + targetDirPath.toString() + ".");
+          }
+
+          Path tableDir = targetDirPath.getParent();
+          FileStatus[] backups = FSUtils.listStatus(outputFs, tableDir);
+          if (backups == null || backups.length == 0) {
+            outputFs.delete(tableDir, true);
+            LOG.debug(tableDir.toString() + " is empty, remove it.");
+          }
+        }
+      }
+
+    } catch (IOException e1) {
+      LOG.error("Cleaning up uncompleted backup data of " + backupInfo.getBackupId() + " at "
+          + backupInfo.getBackupRootDir() + " failed due to " + e1.getMessage() + ".");
+    }
+  }
+
+  /**
+   * Fail the overall backup.
+   * @param backupInfo backup info
+   * @param e exception
+   * @throws Exception exception
+   */
+  protected void failBackup(Connection conn, BackupInfo backupInfo, BackupManager backupManager,
+      Exception e, String msg, BackupType type, Configuration conf) throws IOException {
+
+    try {
+      LOG.error(msg + getMessage(e), e);
+      // If this is a cancel exception, then we've already cleaned.
+      // set the failure timestamp of the overall backup
+      backupInfo.setCompleteTs(EnvironmentEdgeManager.currentTime());
+      // set failure message
+      backupInfo.setFailedMsg(e.getMessage());
+      // set overall backup status: failed
+      backupInfo.setState(BackupState.FAILED);
+      // compose the backup failed data
+      String backupFailedData =
+          "BackupId=" + backupInfo.getBackupId() + ",startts=" + backupInfo.getStartTs()
+              + ",failedts=" + backupInfo.getCompleteTs() + ",failedphase=" + backupInfo.getPhase()
+              + ",failedmessage=" + backupInfo.getFailedMsg();
+      LOG.error(backupFailedData);
+      cleanupAndRestoreBackupSystem(conn, backupInfo, conf);
+      // If backup session is updated to FAILED state - means we
+      // processed recovery already.
+      backupManager.updateBackupInfo(backupInfo);
+      backupManager.finishBackupSession();
+      LOG.error("Backup " + backupInfo.getBackupId() + " failed.");
+    } catch (IOException ee) {
+      LOG.error("Please run backup repair tool manually to restore backup system integrity");
+      throw ee;
+    }
+  }
+
+  public static void cleanupAndRestoreBackupSystem (Connection conn, BackupInfo backupInfo,
+      Configuration conf) throws IOException
+  {
+    BackupType type = backupInfo.getType();
+     // if full backup, then delete HBase snapshots if there already are snapshots taken
+     // and also clean up export snapshot log files if exist
+     if (type == BackupType.FULL) {
+       deleteSnapshots(conn, backupInfo, conf);
+       cleanupExportSnapshotLog(conf);
+     }
+     BackupSystemTable.restoreFromSnapshot(conn);
+     BackupSystemTable.deleteSnapshot(conn);
+     // clean up the uncompleted data at target directory if the ongoing backup has already entered
+     // the copy phase
+     // For incremental backup, DistCp logs will be cleaned with the targetDir.
+     cleanupTargetDir(backupInfo, conf);
+  }
+
+
+
+  /**
+   * Add manifest for the current backup. The manifest is stored within the table backup directory.
+   * @param backupInfo The current backup info
+   * @throws IOException exception
+   * @throws BackupException exception
+   */
+  protected void addManifest(BackupInfo backupInfo, BackupManager backupManager, BackupType type,
+      Configuration conf) throws IOException, BackupException {
+    // set the overall backup phase : store manifest
+    backupInfo.setPhase(BackupPhase.STORE_MANIFEST);
+
+    BackupManifest manifest;
+
+    // Since we have each table's backup in its own directory structure,
+    // we'll store its manifest with the table directory.
+    for (TableName table : backupInfo.getTables()) {
+      manifest = new BackupManifest(backupInfo, table);
+      ArrayList<BackupImage> ancestors = backupManager.getAncestors(backupInfo, table);
+      for (BackupImage image : ancestors) {
+        manifest.addDependentImage(image);
+      }
+
+      if (type == BackupType.INCREMENTAL) {
+        // We'll store the log timestamps for this table only in its manifest.
+        HashMap<TableName, HashMap<String, Long>> tableTimestampMap =
+            new HashMap<TableName, HashMap<String, Long>>();
+        tableTimestampMap.put(table, backupInfo.getIncrTimestampMap().get(table));
+        manifest.setIncrTimestampMap(tableTimestampMap);
+        ArrayList<BackupImage> ancestorss = backupManager.getAncestors(backupInfo);
+        for (BackupImage image : ancestorss) {
+          manifest.addDependentImage(image);
+        }
+      }
+      manifest.store(conf);
+    }
+
+    // For incremental backup, we store a overall manifest in
+    // <backup-root-dir>/WALs/<backup-id>
+    // This is used when created the next incremental backup
+    if (type == BackupType.INCREMENTAL) {
+      manifest = new BackupManifest(backupInfo);
+      // set the table region server start and end timestamps for incremental backup
+      manifest.setIncrTimestampMap(backupInfo.getIncrTimestampMap());
+      ArrayList<BackupImage> ancestors = backupManager.getAncestors(backupInfo);
+      for (BackupImage image : ancestors) {
+        manifest.addDependentImage(image);
+      }
+      manifest.store(conf);
+    }
+  }
+
+  /**
+   * Get backup request meta data dir as string.
+   * @param backupInfo backup info
+   * @return meta data dir
+   */
+  protected String obtainBackupMetaDataStr(BackupInfo backupInfo) {
+    StringBuffer sb = new StringBuffer();
+    sb.append("type=" + backupInfo.getType() + ",tablelist=");
+    for (TableName table : backupInfo.getTables()) {
+      sb.append(table + ";");
+    }
+    if (sb.lastIndexOf(";") > 0) {
+      sb.delete(sb.lastIndexOf(";"), sb.lastIndexOf(";") + 1);
+    }
+    sb.append(",targetRootDir=" + backupInfo.getBackupRootDir());
+
+    return sb.toString();
+  }
+
+  /**
+   * Clean up directories with prefix "_distcp_logs-", which are generated when DistCp copying
+   * hlogs.
+   * @throws IOException exception
+   */
+  protected void cleanupDistCpLog(BackupInfo backupInfo, Configuration conf) throws IOException {
+    Path rootPath = new Path(backupInfo.getHLogTargetDir()).getParent();
+    FileSystem fs = FileSystem.get(rootPath.toUri(), conf);
+    FileStatus[] files = FSUtils.listStatus(fs, rootPath);
+    if (files == null) {
+      return;
+    }
+    for (FileStatus file : files) {
+      if (file.getPath().getName().startsWith("_distcp_logs")) {
+        LOG.debug("Delete log files of DistCp: " + file.getPath().getName());
+        FSUtils.delete(fs, file.getPath(), true);
+      }
+    }
+  }
+
+  /**
+   * Complete the overall backup.
+   * @param backupInfo backup info
+   * @throws Exception exception
+   */
+  protected void completeBackup(final Connection conn, BackupInfo backupInfo,
+      BackupManager backupManager, BackupType type, Configuration conf) throws IOException {
+    // set the complete timestamp of the overall backup
+    backupInfo.setCompleteTs(EnvironmentEdgeManager.currentTime());
+    // set overall backup status: complete
+    backupInfo.setState(BackupState.COMPLETE);
+    backupInfo.setProgress(100);
+    // add and store the manifest for the backup
+    addManifest(backupInfo, backupManager, type, conf);
+
+    // compose the backup complete data
+    String backupCompleteData =
+        obtainBackupMetaDataStr(backupInfo) + ",startts=" + backupInfo.getStartTs()
+            + ",completets=" + backupInfo.getCompleteTs() + ",bytescopied="
+            + backupInfo.getTotalBytesCopied();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Backup " + backupInfo.getBackupId() + " finished: " + backupCompleteData);
+    }
+
+    // when full backup is done:
+    // - delete HBase snapshot
+    // - clean up directories with prefix "exportSnapshot-", which are generated when exporting
+    // snapshots
+    if (type == BackupType.FULL) {
+      deleteSnapshots(conn, backupInfo, conf);
+      cleanupExportSnapshotLog(conf);
+    } else if (type == BackupType.INCREMENTAL) {
+      cleanupDistCpLog(backupInfo, conf);
+    }
+    BackupSystemTable.deleteSnapshot(conn);
+    backupManager.updateBackupInfo(backupInfo);
+
+    // Finish active session
+    backupManager.finishBackupSession();
+
+    LOG.info("Backup " + backupInfo.getBackupId() + " completed.");
+  }
+
+  /**
+   * Backup request execution
+   * @throws IOException
+   */
+  public abstract void execute() throws IOException;
+
+  @VisibleForTesting
+  protected Stage getTestStage() {
+    return Stage.valueOf("stage_"+ conf.getInt(BACKUP_TEST_MODE_STAGE, 0));
+  }
+
+  @VisibleForTesting
+  protected void failStageIf(Stage stage) throws IOException {
+    Stage current = getTestStage();
+    if (current == stage) {
+      throw new IOException("Failed stage " + stage+" in testing");
+    }
+  }
+
+  public static enum Stage {
+    stage_0, stage_1, stage_2, stage_3, stage_4
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/37c65946/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyJob.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyJob.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyJob.java
new file mode 100644
index 0000000..016d1a4
--- /dev/null
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyJob.java
@@ -0,0 +1,344 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.mapreduce;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupCopyJob;
+import org.apache.hadoop.hbase.backup.BackupInfo;
+import org.apache.hadoop.hbase.backup.BackupType;
+import org.apache.hadoop.hbase.backup.impl.BackupManager;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.snapshot.ExportSnapshot;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.tools.DistCp;
+import org.apache.hadoop.tools.DistCpConstants;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+
+/**
+ * Map-Reduce implementation of {@link BackupCopyJob}. Basically, there are 2 types of copy
+ * operation: one is copying from snapshot, which bases on extending ExportSnapshot's function, the
+ * other is copying for incremental log files, which bases on extending DistCp's function.
+ */
+@InterfaceAudience.Private
+public class MapReduceBackupCopyJob implements BackupCopyJob {
+  private static final Log LOG = LogFactory.getLog(MapReduceBackupCopyJob.class);
+
+  private Configuration conf;
+
+  // Accumulated progress within the whole backup process for the copy operation
+  private float progressDone = 0.1f;
+  private long bytesCopied = 0;
+  private static float INIT_PROGRESS = 0.1f;
+
+  // The percentage of the current copy task within the whole task if multiple time copies are
+  // needed. The default value is 100%, which means only 1 copy task for the whole.
+  private float subTaskPercntgInWholeTask = 1f;
+
+  public MapReduceBackupCopyJob() {
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Get the current copy task percentage within the whole task if multiple copies are needed.
+   * @return the current copy task percentage
+   */
+  public float getSubTaskPercntgInWholeTask() {
+    return subTaskPercntgInWholeTask;
+  }
+
+  /**
+   * Set the current copy task percentage within the whole task if multiple copies are needed. Must
+   * be called before calling
+   * {@link #copy(BackupInfo, BackupManager, Configuration, BackupType, String[])}
+   * @param subTaskPercntgInWholeTask The percentage of the copy subtask
+   */
+  public void setSubTaskPercntgInWholeTask(float subTaskPercntgInWholeTask) {
+    this.subTaskPercntgInWholeTask = subTaskPercntgInWholeTask;
+  }
+
+  static class SnapshotCopy extends ExportSnapshot {
+    private BackupInfo backupInfo;
+    private TableName table;
+
+    public SnapshotCopy(BackupInfo backupInfo, TableName table) {
+      super();
+      this.backupInfo = backupInfo;
+      this.table = table;
+    }
+
+    public TableName getTable() {
+      return this.table;
+    }
+
+    public BackupInfo getBackupInfo() {
+      return this.backupInfo;
+    }
+  }
+
+  /**
+   * Update the ongoing backup with new progress.
+   * @param backupInfo backup info
+   * @param newProgress progress
+   * @param bytesCopied bytes copied
+   * @throws NoNodeException exception
+   */
+  static void updateProgress(BackupInfo backupInfo, BackupManager backupManager,
+      int newProgress, long bytesCopied) throws IOException {
+    // compose the new backup progress data, using fake number for now
+    String backupProgressData = newProgress + "%";
+
+    backupInfo.setProgress(newProgress);
+    backupManager.updateBackupInfo(backupInfo);
+    LOG.debug("Backup progress data \"" + backupProgressData
+        + "\" has been updated to backup system table for " + backupInfo.getBackupId());
+  }
+
+  /**
+   * Extends DistCp for progress updating to backup system table
+   * during backup. Using DistCpV2 (MAPREDUCE-2765).
+   * Simply extend it and override execute() method to get the
+   * Job reference for progress updating.
+   * Only the argument "src1, [src2, [...]] dst" is supported,
+   * no more DistCp options.
+   */
+  class BackupDistCp extends DistCp {
+
+    private BackupInfo backupInfo;
+    private BackupManager backupManager;
+
+    public BackupDistCp(Configuration conf, DistCpOptions options, BackupInfo backupInfo,
+        BackupManager backupManager) throws Exception {
+      super(conf, options);
+      this.backupInfo = backupInfo;
+      this.backupManager = backupManager;
+    }
+
+    @Override
+    public Job execute() throws Exception {
+
+      // reflection preparation for private methods and fields
+      Class<?> classDistCp = org.apache.hadoop.tools.DistCp.class;
+      Method methodCreateMetaFolderPath = classDistCp.getDeclaredMethod("createMetaFolderPath");
+      Method methodCreateJob = classDistCp.getDeclaredMethod("createJob");
+      Method methodCreateInputFileListing =
+          classDistCp.getDeclaredMethod("createInputFileListing", Job.class);
+      Method methodCleanup = classDistCp.getDeclaredMethod("cleanup");
+
+      Field fieldInputOptions = classDistCp.getDeclaredField("inputOptions");
+      Field fieldMetaFolder = classDistCp.getDeclaredField("metaFolder");
+      Field fieldJobFS = classDistCp.getDeclaredField("jobFS");
+      Field fieldSubmitted = classDistCp.getDeclaredField("submitted");
+
+      methodCreateMetaFolderPath.setAccessible(true);
+      methodCreateJob.setAccessible(true);
+      methodCreateInputFileListing.setAccessible(true);
+      methodCleanup.setAccessible(true);
+
+      fieldInputOptions.setAccessible(true);
+      fieldMetaFolder.setAccessible(true);
+      fieldJobFS.setAccessible(true);
+      fieldSubmitted.setAccessible(true);
+
+      // execute() logic starts here
+      assert fieldInputOptions.get(this) != null;
+
+      Job job = null;
+      try {
+        synchronized (this) {
+          // Don't cleanup while we are setting up.
+          fieldMetaFolder.set(this, methodCreateMetaFolderPath.invoke(this));
+          fieldJobFS.set(this, ((Path) fieldMetaFolder.get(this)).getFileSystem(super.getConf()));
+          job = (Job) methodCreateJob.invoke(this);
+        }
+        methodCreateInputFileListing.invoke(this, job);
+
+        // Get the total length of the source files
+        List<Path> srcs = ((DistCpOptions) fieldInputOptions.get(this)).getSourcePaths();
+
+        long totalSrcLgth = 0;
+        for (Path aSrc : srcs) {
+          totalSrcLgth +=
+              BackupUtils.getFilesLength(aSrc.getFileSystem(super.getConf()), aSrc);
+        }
+
+        // submit the copy job
+        job.submit();
+        fieldSubmitted.set(this, true);
+
+        // after submit the MR job, set its handler in backup handler for cancel process
+        // this.backupHandler.copyJob = job;
+
+        // Update the copy progress to ZK every 0.5s if progress value changed
+        int progressReportFreq =
+            MapReduceBackupCopyJob.this.getConf().getInt("hbase.backup.progressreport.frequency",
+              500);
+        float lastProgress = progressDone;
+        while (!job.isComplete()) {
+          float newProgress =
+              progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS);
+
+          if (newProgress > lastProgress) {
+
+            BigDecimal progressData =
+                new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP);
+            String newProgressStr = progressData + "%";
+            LOG.info("Progress: " + newProgressStr);
+            updateProgress(backupInfo, backupManager, progressData.intValue(), bytesCopied);
+            LOG.debug("Backup progress data updated to backup system table: \"Progress: "
+                + newProgressStr + ".\"");
+            lastProgress = newProgress;
+          }
+          Thread.sleep(progressReportFreq);
+        }
+        // update the progress data after copy job complete
+        float newProgress =
+            progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS);
+        BigDecimal progressData =
+            new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP);
+
+        String newProgressStr = progressData + "%";
+        LOG.info("Progress: " + newProgressStr + " subTask: " + subTaskPercntgInWholeTask
+            + " mapProgress: " + job.mapProgress());
+
+        // accumulate the overall backup progress
+        progressDone = newProgress;
+        bytesCopied += totalSrcLgth;
+
+        updateProgress(backupInfo, backupManager, progressData.intValue(), bytesCopied);
+        LOG.debug("Backup progress data updated to backup system table: \"Progress: "
+            + newProgressStr + " - " + bytesCopied + " bytes copied.\"");
+      } catch (Throwable t) {
+        LOG.error("distcp " + job == null ? "" : job.getJobID() + " encountered error", t);
+        throw t;
+      } finally {
+        if (!fieldSubmitted.getBoolean(this)) {
+          methodCleanup.invoke(this);
+        }
+      }
+
+      String jobID = job.getJobID().toString();
+      job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID);
+
+      LOG.debug("DistCp job-id: " + jobID + " completed: " + job.isComplete() + " "
+          + job.isSuccessful());
+      Counters ctrs = job.getCounters();
+      LOG.debug(ctrs);
+      if (job.isComplete() && !job.isSuccessful()) {
+        throw new Exception("DistCp job-id: " + jobID + " failed");
+      }
+
+      return job;
+    }
+
+  }
+
+  /**
+   * Do backup copy based on different types.
+   * @param context The backup info
+   * @param conf The hadoop configuration
+   * @param copyType The backup copy type
+   * @param options Options for customized ExportSnapshot or DistCp
+   * @throws Exception exception
+   */
+  @Override
+  public int copy(BackupInfo context, BackupManager backupManager, Configuration conf,
+      BackupType copyType, String[] options) throws IOException {
+    int res = 0;
+
+    try {
+      if (copyType == BackupType.FULL) {
+        SnapshotCopy snapshotCp = new SnapshotCopy(context, context.getTableBySnapshot(options[1]));
+        LOG.debug("Doing SNAPSHOT_COPY");
+        // Make a new instance of conf to be used by the snapshot copy class.
+        snapshotCp.setConf(new Configuration(conf));
+        res = snapshotCp.run(options);
+
+      } else if (copyType == BackupType.INCREMENTAL) {
+        LOG.debug("Doing COPY_TYPE_DISTCP");
+        setSubTaskPercntgInWholeTask(1f);
+
+        BackupDistCp distcp =
+            new BackupDistCp(new Configuration(conf), null, context, backupManager);
+        // Handle a special case where the source file is a single file.
+        // In this case, distcp will not create the target dir. It just take the
+        // target as a file name and copy source file to the target (as a file name).
+        // We need to create the target dir before run distcp.
+        LOG.debug("DistCp options: " + Arrays.toString(options));
+        Path dest = new Path(options[options.length - 1]);
+        FileSystem destfs = dest.getFileSystem(conf);
+        if (!destfs.exists(dest)) {
+          destfs.mkdirs(dest);
+        }
+        res = distcp.run(options);
+      }
+      return res;
+
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void cancel(String jobId) throws IOException {
+    JobID id = JobID.forName(jobId);
+    Cluster cluster = new Cluster(this.getConf());
+    try {
+      Job job = cluster.getJob(id);
+      if (job == null) {
+        LOG.error("No job found for " + id);
+        // should we throw exception
+        return;
+      }
+      if (job.isComplete() || job.isRetired()) {
+        return;
+      }
+
+      job.killJob();
+      LOG.debug("Killed copy job " + id);
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/37c65946/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java
new file mode 100644
index 0000000..00c5b83
--- /dev/null
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java
@@ -0,0 +1,321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.mapreduce;
+
+import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupInfo;
+import org.apache.hadoop.hbase.backup.BackupMergeJob;
+import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
+import org.apache.hadoop.hbase.backup.HBackupFileSystem;
+import org.apache.hadoop.hbase.backup.impl.BackupManifest;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.util.Tool;
+
+/**
+ * MapReduce implementation of {@link BackupMergeJob}
+ * Must be initialized with configuration of a backup destination cluster
+ *
+ */
+
+@InterfaceAudience.Private
+public class MapReduceBackupMergeJob implements BackupMergeJob {
+  public static final Log LOG = LogFactory.getLog(MapReduceBackupMergeJob.class);
+
+  protected Tool player;
+  protected Configuration conf;
+
+  public MapReduceBackupMergeJob() {
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public void run(String[] backupIds) throws IOException {
+    String bulkOutputConfKey;
+
+    // TODO : run player on remote cluster
+    player = new MapReduceHFileSplitterJob();
+    bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY;
+    // Player reads all files in arbitrary directory structure and creates
+    // a Map task for each file
+    String bids = StringUtils.join(backupIds, ",");
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Merge backup images " + bids);
+    }
+
+    List<Pair<TableName, Path>> processedTableList = new ArrayList<Pair<TableName, Path>>();
+    boolean finishedTables = false;
+    Connection conn = ConnectionFactory.createConnection(getConf());
+    BackupSystemTable table = new BackupSystemTable(conn);
+    FileSystem fs = FileSystem.get(getConf());
+
+    try {
+
+      // Get exclusive lock on backup system
+      table.startBackupExclusiveOperation();
+      // Start merge operation
+      table.startMergeOperation(backupIds);
+
+      // Select most recent backup id
+      String mergedBackupId = findMostRecentBackupId(backupIds);
+
+      TableName[] tableNames = getTableNamesInBackupImages(backupIds);
+      String backupRoot = null;
+
+      BackupInfo bInfo = table.readBackupInfo(backupIds[0]);
+      backupRoot = bInfo.getBackupRootDir();
+
+      for (int i = 0; i < tableNames.length; i++) {
+
+        LOG.info("Merge backup images for " + tableNames[i]);
+
+        // Find input directories for table
+
+        Path[] dirPaths = findInputDirectories(fs, backupRoot, tableNames[i], backupIds);
+        String dirs = StringUtils.join(dirPaths, ",");
+        Path bulkOutputPath =
+            BackupUtils.getBulkOutputDir(BackupUtils.getFileNameCompatibleString(tableNames[i]),
+              getConf(), false);
+        // Delete content if exists
+        if (fs.exists(bulkOutputPath)) {
+          if (!fs.delete(bulkOutputPath, true)) {
+            LOG.warn("Can not delete: " + bulkOutputPath);
+          }
+        }
+        Configuration conf = getConf();
+        conf.set(bulkOutputConfKey, bulkOutputPath.toString());
+        String[] playerArgs = { dirs, tableNames[i].getNameAsString() };
+
+        int result = 0;
+
+        player.setConf(getConf());
+        result = player.run(playerArgs);
+        if (!succeeded(result)) {
+          throw new IOException("Can not merge backup images for " + dirs
+              + " (check Hadoop/MR and HBase logs). Player return code =" + result);
+        }
+        // Add to processed table list
+        processedTableList.add(new Pair<TableName, Path>(tableNames[i], bulkOutputPath));
+        LOG.debug("Merge Job finished:" + result);
+      }
+      List<TableName> tableList = toTableNameList(processedTableList);
+      table.updateProcessedTablesForMerge(tableList);
+      finishedTables = true;
+
+      // Move data
+      for (Pair<TableName, Path> tn : processedTableList) {
+        moveData(fs, backupRoot, tn.getSecond(), tn.getFirst(), mergedBackupId);
+      }
+
+      // Delete old data and update manifest
+      List<String> backupsToDelete = getBackupIdsToDelete(backupIds, mergedBackupId);
+      deleteBackupImages(backupsToDelete, conn, fs, backupRoot);
+      updateBackupManifest(backupRoot, mergedBackupId, backupsToDelete);
+      // Finish merge session
+      table.finishMergeOperation();
+      // Release lock
+      table.finishBackupExclusiveOperation();
+    } catch (RuntimeException e) {
+
+      throw e;
+    } catch (Exception e) {
+      LOG.error(e);
+      if (!finishedTables) {
+        // cleanup bulk directories and finish merge
+        // merge MUST be repeated (no need for repair)
+        cleanupBulkLoadDirs(fs, toPathList(processedTableList));
+        table.finishMergeOperation();
+        table.finishBackupExclusiveOperation();
+        throw new IOException("Backup merge operation failed, you should try it again", e);
+      } else {
+        // backup repair must be run
+        throw new IOException(
+            "Backup merge operation failed, run backup repair tool to restore system's integrity",
+            e);
+      }
+    } finally {
+      table.close();
+      conn.close();
+    }
+  }
+
+  protected List<Path> toPathList(List<Pair<TableName, Path>> processedTableList) {
+    ArrayList<Path> list = new ArrayList<Path>();
+    for (Pair<TableName, Path> p : processedTableList) {
+      list.add(p.getSecond());
+    }
+    return list;
+  }
+
+  protected List<TableName> toTableNameList(List<Pair<TableName, Path>> processedTableList) {
+    ArrayList<TableName> list = new ArrayList<TableName>();
+    for (Pair<TableName, Path> p : processedTableList) {
+      list.add(p.getFirst());
+    }
+    return list;
+  }
+
+  protected void cleanupBulkLoadDirs(FileSystem fs, List<Path> pathList) throws IOException {
+    for (Path path : pathList) {
+
+      if (!fs.delete(path, true)) {
+        LOG.warn("Can't delete " + path);
+      }
+    }
+  }
+
+  protected void updateBackupManifest(String backupRoot, String mergedBackupId,
+      List<String> backupsToDelete) throws IllegalArgumentException, IOException {
+
+    BackupManifest manifest =
+        HBackupFileSystem.getManifest(conf, new Path(backupRoot), mergedBackupId);
+    manifest.getBackupImage().removeAncestors(backupsToDelete);
+    // save back
+    manifest.store(conf);
+
+  }
+
+  protected void deleteBackupImages(List<String> backupIds, Connection conn, FileSystem fs,
+      String backupRoot) throws IOException {
+
+    // Delete from backup system table
+    try (BackupSystemTable table = new BackupSystemTable(conn);) {
+      for (String backupId : backupIds) {
+        table.deleteBackupInfo(backupId);
+      }
+    }
+
+    // Delete from file system
+    for (String backupId : backupIds) {
+      Path backupDirPath = HBackupFileSystem.getBackupPath(backupRoot, backupId);
+
+      if (!fs.delete(backupDirPath, true)) {
+        LOG.warn("Could not delete " + backupDirPath);
+      }
+    }
+  }
+
+  protected List<String> getBackupIdsToDelete(String[] backupIds, String mergedBackupId) {
+    List<String> list = new ArrayList<String>();
+    for (String id : backupIds) {
+      if (id.equals(mergedBackupId)) {
+        continue;
+      }
+      list.add(id);
+    }
+    return list;
+  }
+
+  protected void moveData(FileSystem fs, String backupRoot, Path bulkOutputPath, TableName tableName,
+      String mergedBackupId) throws IllegalArgumentException, IOException {
+
+    Path dest =
+        new Path(HBackupFileSystem.getTableBackupDataDir(backupRoot, mergedBackupId, tableName));
+
+    // Delete all in dest
+    if (!fs.delete(dest, true)) {
+      throw new IOException("Could not delete " + dest);
+    }
+
+    FileStatus[] fsts = fs.listStatus(bulkOutputPath);
+    for (FileStatus fst : fsts) {
+      if (fst.isDirectory()) {
+        fs.rename(fst.getPath().getParent(), dest);
+      }
+    }
+
+  }
+
+  protected String findMostRecentBackupId(String[] backupIds) {
+    long recentTimestamp = Long.MIN_VALUE;
+    for (String backupId : backupIds) {
+      long ts = Long.parseLong(backupId.split("_")[1]);
+      if (ts > recentTimestamp) {
+        recentTimestamp = ts;
+      }
+    }
+    return BackupRestoreConstants.BACKUPID_PREFIX + recentTimestamp;
+  }
+
+  protected TableName[] getTableNamesInBackupImages(String[] backupIds) throws IOException {
+
+    Set<TableName> allSet = new HashSet<TableName>();
+
+    try (Connection conn = ConnectionFactory.createConnection(conf);
+        BackupSystemTable table = new BackupSystemTable(conn);) {
+      for (String backupId : backupIds) {
+        BackupInfo bInfo = table.readBackupInfo(backupId);
+
+        allSet.addAll(bInfo.getTableNames());
+      }
+    }
+
+    TableName[] ret = new TableName[allSet.size()];
+    return allSet.toArray(ret);
+  }
+
+  protected Path[] findInputDirectories(FileSystem fs, String backupRoot, TableName tableName,
+      String[] backupIds) throws IOException {
+
+    List<Path> dirs = new ArrayList<Path>();
+
+    for (String backupId : backupIds) {
+      Path fileBackupDirPath =
+          new Path(HBackupFileSystem.getTableBackupDataDir(backupRoot, backupId, tableName));
+      if (fs.exists(fileBackupDirPath)) {
+        dirs.add(fileBackupDirPath);
+      } else {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("File: " + fileBackupDirPath + " does not exist.");
+        }
+      }
+    }
+    Path[] ret = new Path[dirs.size()];
+    return dirs.toArray(ret);
+  }
+
+}