You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/12/02 01:22:49 UTC

[06/10] hbase git commit: HBASE-19407 [branch-2] Remove backup/restore

http://git-wip-us.apache.org/repos/asf/hbase/blob/79ac70ac/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java
deleted file mode 100644
index d9258c5..0000000
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java
+++ /dev/null
@@ -1,387 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.backup.impl;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.backup.impl.BackupSystemTable.WALItem;
-import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
-import org.apache.hadoop.hbase.backup.util.BackupUtils;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
-
-/**
- * After a full backup was created, the incremental backup will only store the changes made after
- * the last full or incremental backup. Creating the backup copies the logfiles in .logs and
- * .oldlogs since the last backup timestamp.
- */
-@InterfaceAudience.Private
-public class IncrementalBackupManager extends BackupManager {
-  public static final Log LOG = LogFactory.getLog(IncrementalBackupManager.class);
-
-  public IncrementalBackupManager(Connection conn, Configuration conf) throws IOException {
-    super(conn, conf);
-  }
-
-  /**
-   * Obtain the list of logs that need to be copied out for this incremental backup. The list is set
-   * in BackupInfo.
-   * @return The new HashMap of RS log time stamps after the log roll for this incremental backup.
-   * @throws IOException exception
-   */
-  public HashMap<String, Long> getIncrBackupLogFileMap()
-      throws IOException {
-    List<String> logList;
-    HashMap<String, Long> newTimestamps;
-    HashMap<String, Long> previousTimestampMins;
-
-    String savedStartCode = readBackupStartCode();
-
-    // key: tableName
-    // value: <RegionServer,PreviousTimeStamp>
-    HashMap<TableName, HashMap<String, Long>> previousTimestampMap = readLogTimestampMap();
-
-    previousTimestampMins = BackupUtils.getRSLogTimestampMins(previousTimestampMap);
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("StartCode " + savedStartCode + "for backupID " + backupInfo.getBackupId());
-    }
-    // get all new log files from .logs and .oldlogs after last TS and before new timestamp
-    if (savedStartCode == null || previousTimestampMins == null
-        || previousTimestampMins.isEmpty()) {
-      throw new IOException(
-          "Cannot read any previous back up timestamps from backup system table. "
-              + "In order to create an incremental backup, at least one full backup is needed.");
-    }
-
-    LOG.info("Execute roll log procedure for incremental backup ...");
-    HashMap<String, String> props = new HashMap<String, String>();
-    props.put("backupRoot", backupInfo.getBackupRootDir());
-
-    try (Admin admin = conn.getAdmin();) {
-
-      admin.execProcedure(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE,
-        LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props);
-
-    }
-    newTimestamps = readRegionServerLastLogRollResult();
-
-    logList = getLogFilesForNewBackup(previousTimestampMins, newTimestamps, conf, savedStartCode);
-    List<WALItem> logFromSystemTable =
-        getLogFilesFromBackupSystem(previousTimestampMins, newTimestamps, getBackupInfo()
-            .getBackupRootDir());
-    logList = excludeAlreadyBackedUpWALs(logList, logFromSystemTable);
-    backupInfo.setIncrBackupFileList(logList);
-
-    return newTimestamps;
-  }
-
-  /**
-   * Get list of WAL files eligible for incremental backup
-   * @return list of WAL files
-   * @throws IOException
-   */
-  public List<String> getIncrBackupLogFileList()
-      throws IOException {
-    List<String> logList;
-    HashMap<String, Long> newTimestamps;
-    HashMap<String, Long> previousTimestampMins;
-
-    String savedStartCode = readBackupStartCode();
-
-    // key: tableName
-    // value: <RegionServer,PreviousTimeStamp>
-    HashMap<TableName, HashMap<String, Long>> previousTimestampMap = readLogTimestampMap();
-
-    previousTimestampMins = BackupUtils.getRSLogTimestampMins(previousTimestampMap);
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("StartCode " + savedStartCode + "for backupID " + backupInfo.getBackupId());
-    }
-    // get all new log files from .logs and .oldlogs after last TS and before new timestamp
-    if (savedStartCode == null || previousTimestampMins == null
-        || previousTimestampMins.isEmpty()) {
-      throw new IOException(
-          "Cannot read any previous back up timestamps from backup system table. "
-              + "In order to create an incremental backup, at least one full backup is needed.");
-    }
-
-    newTimestamps = readRegionServerLastLogRollResult();
-
-    logList = getLogFilesForNewBackup(previousTimestampMins, newTimestamps, conf, savedStartCode);
-    List<WALItem> logFromSystemTable =
-        getLogFilesFromBackupSystem(previousTimestampMins, newTimestamps, getBackupInfo()
-            .getBackupRootDir());
-
-    logList = excludeAlreadyBackedUpWALs(logList, logFromSystemTable);
-    backupInfo.setIncrBackupFileList(logList);
-
-    return logList;
-  }
-
-
-  private List<String> excludeAlreadyBackedUpWALs(List<String> logList,
-      List<WALItem> logFromSystemTable) {
-
-    Set<String> walFileNameSet = convertToSet(logFromSystemTable);
-
-    List<String> list = new ArrayList<String>();
-    for (int i=0; i < logList.size(); i++) {
-      Path p = new Path(logList.get(i));
-      String name  = p.getName();
-      if (walFileNameSet.contains(name)) continue;
-      list.add(logList.get(i));
-    }
-    return list;
-  }
-
-  /**
-   * Create Set of WAL file names (not full path names)
-   * @param logFromSystemTable
-   * @return set of WAL file names
-   */
-  private Set<String> convertToSet(List<WALItem> logFromSystemTable) {
-
-    Set<String> set = new HashSet<String>();
-    for (int i=0; i < logFromSystemTable.size(); i++) {
-      WALItem item = logFromSystemTable.get(i);
-      set.add(item.walFile);
-    }
-    return set;
-  }
-
-  /**
-   * For each region server: get all log files newer than the last timestamps, but not newer than
-   * the newest timestamps.
-   * @param olderTimestamps timestamp map for each region server of the last backup.
-   * @param newestTimestamps timestamp map for each region server that the backup should lead to.
-   * @return list of log files which needs to be added to this backup
-   * @throws IOException
-   */
-  private List<WALItem> getLogFilesFromBackupSystem(HashMap<String, Long> olderTimestamps,
-      HashMap<String, Long> newestTimestamps, String backupRoot) throws IOException {
-    List<WALItem> logFiles = new ArrayList<WALItem>();
-    Iterator<WALItem> it = getWALFilesFromBackupSystem();
-    while (it.hasNext()) {
-      WALItem item = it.next();
-      String rootDir = item.getBackupRoot();
-      if (!rootDir.equals(backupRoot)) {
-        continue;
-      }
-      String walFileName = item.getWalFile();
-      String server = BackupUtils.parseHostNameFromLogFile(new Path(walFileName));
-      if (server == null) {
-        continue;
-      }
-      Long tss = getTimestamp(walFileName);
-      Long oldTss = olderTimestamps.get(server);
-      Long newTss = newestTimestamps.get(server);
-      if (oldTss == null) {
-        logFiles.add(item);
-        continue;
-      }
-      if (newTss == null) {
-        newTss = Long.MAX_VALUE;
-      }
-      if (tss > oldTss && tss < newTss) {
-        logFiles.add(item);
-      }
-    }
-    return logFiles;
-  }
-
-  private Long getTimestamp(String walFileName) {
-    int index = walFileName.lastIndexOf(BackupUtils.LOGNAME_SEPARATOR);
-    return Long.parseLong(walFileName.substring(index + 1));
-  }
-
-  /**
-   * For each region server: get all log files newer than the last timestamps but not newer than the
-   * newest timestamps.
-   * @param olderTimestamps the timestamp for each region server of the last backup.
-   * @param newestTimestamps the timestamp for each region server that the backup should lead to.
-   * @param conf the Hadoop and Hbase configuration
-   * @param savedStartCode the startcode (timestamp) of last successful backup.
-   * @return a list of log files to be backed up
-   * @throws IOException exception
-   */
-  private List<String> getLogFilesForNewBackup(HashMap<String, Long> olderTimestamps,
-      HashMap<String, Long> newestTimestamps, Configuration conf, String savedStartCode)
-      throws IOException {
-    LOG.debug("In getLogFilesForNewBackup()\n" + "olderTimestamps: " + olderTimestamps
-        + "\n newestTimestamps: " + newestTimestamps);
-    Path rootdir = FSUtils.getRootDir(conf);
-    Path logDir = new Path(rootdir, HConstants.HREGION_LOGDIR_NAME);
-    Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
-    FileSystem fs = rootdir.getFileSystem(conf);
-    NewestLogFilter pathFilter = new NewestLogFilter();
-
-    List<String> resultLogFiles = new ArrayList<String>();
-    List<String> newestLogs = new ArrayList<String>();
-
-    /*
-     * The old region servers and timestamps info we kept in backup system table may be out of sync
-     * if new region server is added or existing one lost. We'll deal with it here when processing
-     * the logs. If data in backup system table has more hosts, just ignore it. If the .logs
-     * directory includes more hosts, the additional hosts will not have old timestamps to compare
-     * with. We'll just use all the logs in that directory. We always write up-to-date region server
-     * and timestamp info to backup system table at the end of successful backup.
-     */
-
-    FileStatus[] rss;
-    Path p;
-    String host;
-    Long oldTimeStamp;
-    String currentLogFile;
-    long currentLogTS;
-
-    // Get the files in .logs.
-    rss = fs.listStatus(logDir);
-    for (FileStatus rs : rss) {
-      p = rs.getPath();
-      host = BackupUtils.parseHostNameFromLogFile(p);
-      if (host == null) {
-        continue;
-      }
-      FileStatus[] logs;
-      oldTimeStamp = olderTimestamps.get(host);
-      // It is possible that there is no old timestamp in backup system table for this host if
-      // this region server is newly added after our last backup.
-      if (oldTimeStamp == null) {
-        logs = fs.listStatus(p);
-      } else {
-        pathFilter.setLastBackupTS(oldTimeStamp);
-        logs = fs.listStatus(p, pathFilter);
-      }
-      for (FileStatus log : logs) {
-        LOG.debug("currentLogFile: " + log.getPath().toString());
-        if (AbstractFSWALProvider.isMetaFile(log.getPath())) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Skip hbase:meta log file: " + log.getPath().getName());
-          }
-          continue;
-        }
-        currentLogFile = log.getPath().toString();
-        resultLogFiles.add(currentLogFile);
-        currentLogTS = BackupUtils.getCreationTime(log.getPath());
-        // newestTimestamps is up-to-date with the current list of hosts
-        // so newestTimestamps.get(host) will not be null.
-        if (currentLogTS > newestTimestamps.get(host)) {
-          newestLogs.add(currentLogFile);
-        }
-      }
-    }
-
-    // Include the .oldlogs files too.
-    FileStatus[] oldlogs = fs.listStatus(oldLogDir);
-    for (FileStatus oldlog : oldlogs) {
-      p = oldlog.getPath();
-      currentLogFile = p.toString();
-      if (AbstractFSWALProvider.isMetaFile(p)) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Skip .meta log file: " + currentLogFile);
-        }
-        continue;
-      }
-      host = BackupUtils.parseHostFromOldLog(p);
-      if (host == null) {
-        continue;
-      }
-      currentLogTS = BackupUtils.getCreationTime(p);
-      oldTimeStamp = olderTimestamps.get(host);
-      /*
-       * It is possible that there is no old timestamp in backup system table for this host. At the
-       * time of our last backup operation, this rs did not exist. The reason can be one of the two:
-       * 1. The rs already left/crashed. Its logs were moved to .oldlogs. 2. The rs was added after
-       * our last backup.
-       */
-      if (oldTimeStamp == null) {
-        if (currentLogTS < Long.parseLong(savedStartCode)) {
-          // This log file is really old, its region server was before our last backup.
-          continue;
-        } else {
-          resultLogFiles.add(currentLogFile);
-        }
-      } else if (currentLogTS > oldTimeStamp) {
-        resultLogFiles.add(currentLogFile);
-      }
-
-      // It is possible that a host in .oldlogs is an obsolete region server
-      // so newestTimestamps.get(host) here can be null.
-      // Even if these logs belong to a obsolete region server, we still need
-      // to include they to avoid loss of edits for backup.
-      Long newTimestamp = newestTimestamps.get(host);
-      if (newTimestamp != null && currentLogTS > newTimestamp) {
-        newestLogs.add(currentLogFile);
-      }
-    }
-    // remove newest log per host because they are still in use
-    resultLogFiles.removeAll(newestLogs);
-    return resultLogFiles;
-  }
-
-  static class NewestLogFilter implements PathFilter {
-    private Long lastBackupTS = 0L;
-
-    public NewestLogFilter() {
-    }
-
-    protected void setLastBackupTS(Long ts) {
-      this.lastBackupTS = ts;
-    }
-
-    @Override
-    public boolean accept(Path path) {
-      // skip meta table log -- ts.meta file
-      if (AbstractFSWALProvider.isMetaFile(path)) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Skip .meta log file: " + path.getName());
-        }
-        return false;
-      }
-      long timestamp;
-      try {
-        timestamp = BackupUtils.getCreationTime(path);
-        return timestamp > lastBackupTS;
-      } catch (Exception e) {
-        LOG.warn("Cannot read timestamp of log file " + path);
-        return false;
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/79ac70ac/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
deleted file mode 100644
index 00baeb7..0000000
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
+++ /dev/null
@@ -1,404 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.backup.impl;
-
-import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.backup.BackupCopyJob;
-import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase;
-import org.apache.hadoop.hbase.backup.BackupRequest;
-import org.apache.hadoop.hbase.backup.BackupRestoreFactory;
-import org.apache.hadoop.hbase.backup.BackupType;
-import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupCopyJob;
-import org.apache.hadoop.hbase.backup.util.BackupUtils;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.mapreduce.WALPlayer;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.HFileArchiveUtil;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
-import org.apache.hadoop.util.Tool;
-import org.apache.yetus.audience.InterfaceAudience;
-
-/**
- * Incremental backup implementation.
- * See the {@link #execute() execute} method.
- *
- */
-@InterfaceAudience.Private
-public class IncrementalTableBackupClient extends TableBackupClient {
-  private static final Log LOG = LogFactory.getLog(IncrementalTableBackupClient.class);
-
-  protected IncrementalTableBackupClient() {
-  }
-
-  public IncrementalTableBackupClient(final Connection conn, final String backupId,
-      BackupRequest request) throws IOException {
-    super(conn, backupId, request);
-  }
-
-  protected List<String> filterMissingFiles(List<String> incrBackupFileList) throws IOException {
-    FileSystem fs = FileSystem.get(conf);
-    List<String> list = new ArrayList<String>();
-    for (String file : incrBackupFileList) {
-      Path p = new Path(file);
-      if (fs.exists(p) || isActiveWalPath(p)) {
-        list.add(file);
-      } else {
-        LOG.warn("Can't find file: " + file);
-      }
-    }
-    return list;
-  }
-
-  /**
-   * Check if a given path is belongs to active WAL directory
-   * @param p path
-   * @return true, if yes
-   */
-  protected boolean isActiveWalPath(Path p) {
-    return !AbstractFSWALProvider.isArchivedLogFile(p);
-  }
-
-  protected static int getIndex(TableName tbl, List<TableName> sTableList) {
-    if (sTableList == null) return 0;
-    for (int i = 0; i < sTableList.size(); i++) {
-      if (tbl.equals(sTableList.get(i))) {
-        return i;
-      }
-    }
-    return -1;
-  }
-
-  /*
-   * Reads bulk load records from backup table, iterates through the records and forms the paths
-   * for bulk loaded hfiles. Copies the bulk loaded hfiles to backup destination
-   * @param sTableList list of tables to be backed up
-   * @return map of table to List of files
-   */
-  protected Map<byte[], List<Path>>[] handleBulkLoad(List<TableName> sTableList) throws IOException {
-    Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList.size()];
-    List<String> activeFiles = new ArrayList<String>();
-    List<String> archiveFiles = new ArrayList<String>();
-    Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>> pair =
-    backupManager.readBulkloadRows(sTableList);
-    Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = pair.getFirst();
-    FileSystem fs = FileSystem.get(conf);
-    FileSystem tgtFs;
-    try {
-      tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf);
-    } catch (URISyntaxException use) {
-      throw new IOException("Unable to get FileSystem", use);
-    }
-    Path rootdir = FSUtils.getRootDir(conf);
-    Path tgtRoot = new Path(new Path(backupInfo.getBackupRootDir()), backupId);
-    for (Map.Entry<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> tblEntry :
-      map.entrySet()) {
-      TableName srcTable = tblEntry.getKey();
-
-      int srcIdx = getIndex(srcTable, sTableList);
-      if (srcIdx < 0) {
-        LOG.warn("Couldn't find " + srcTable + " in source table List");
-        continue;
-      }
-      if (mapForSrc[srcIdx] == null) {
-        mapForSrc[srcIdx] = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
-      }
-      Path tblDir = FSUtils.getTableDir(rootdir, srcTable);
-      Path tgtTable = new Path(new Path(tgtRoot, srcTable.getNamespaceAsString()),
-          srcTable.getQualifierAsString());
-      for (Map.Entry<String,Map<String,List<Pair<String, Boolean>>>> regionEntry :
-        tblEntry.getValue().entrySet()){
-        String regionName = regionEntry.getKey();
-        Path regionDir = new Path(tblDir, regionName);
-        // map from family to List of hfiles
-        for (Map.Entry<String,List<Pair<String, Boolean>>> famEntry :
-          regionEntry.getValue().entrySet()) {
-          String fam = famEntry.getKey();
-          Path famDir = new Path(regionDir, fam);
-          List<Path> files;
-          if (!mapForSrc[srcIdx].containsKey(fam.getBytes())) {
-            files = new ArrayList<Path>();
-            mapForSrc[srcIdx].put(fam.getBytes(), files);
-          } else {
-            files = mapForSrc[srcIdx].get(fam.getBytes());
-          }
-          Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, regionName, fam);
-          String tblName = srcTable.getQualifierAsString();
-          Path tgtFam = new Path(new Path(tgtTable, regionName), fam);
-          if (!tgtFs.mkdirs(tgtFam)) {
-            throw new IOException("couldn't create " + tgtFam);
-          }
-          for (Pair<String, Boolean> fileWithState : famEntry.getValue()) {
-            String file = fileWithState.getFirst();
-            int idx = file.lastIndexOf("/");
-            String filename = file;
-            if (idx > 0) {
-              filename = file.substring(idx+1);
-            }
-            Path p = new Path(famDir, filename);
-            Path tgt = new Path(tgtFam, filename);
-            Path archive = new Path(archiveDir, filename);
-            if (fs.exists(p)) {
-              if (LOG.isTraceEnabled()) {
-                LOG.trace("found bulk hfile " + file + " in " + famDir + " for " + tblName);
-              }
-                if (LOG.isTraceEnabled()) {
-                  LOG.trace("copying " + p + " to " + tgt);
-                }
-                activeFiles.add(p.toString());
-            } else if (fs.exists(archive)){
-              LOG.debug("copying archive " + archive + " to " + tgt);
-                archiveFiles.add(archive.toString());
-            }
-            files.add(tgt);
-          }
-        }
-      }
-    }
-
-    copyBulkLoadedFiles(activeFiles, archiveFiles);
-
-    backupManager.writeBulkLoadedFiles(sTableList, mapForSrc);
-    backupManager.removeBulkLoadedRows(sTableList, pair.getSecond());
-    return mapForSrc;
-  }
-
-  private void copyBulkLoadedFiles(List<String> activeFiles, List<String> archiveFiles)
-    throws IOException
-  {
-
-    try {
-      // Enable special mode of BackupDistCp
-      conf.setInt(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 5);
-      // Copy active files
-      String tgtDest = backupInfo.getBackupRootDir() + Path.SEPARATOR + backupInfo.getBackupId();
-      if (activeFiles.size() > 0) {
-        String[] toCopy = new String[activeFiles.size()];
-        activeFiles.toArray(toCopy);
-        incrementalCopyHFiles(toCopy, tgtDest);
-      }
-      if (archiveFiles.size() > 0) {
-        String[] toCopy = new String[archiveFiles.size()];
-        archiveFiles.toArray(toCopy);
-        incrementalCopyHFiles(toCopy, tgtDest);
-      }
-    } finally {
-      // Disable special mode of BackupDistCp
-      conf.unset(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY);
-    }
-
-  }
-
-  @Override
-  public void execute() throws IOException {
-
-    try {
-      // case PREPARE_INCREMENTAL:
-      beginBackup(backupManager, backupInfo);
-      backupInfo.setPhase(BackupPhase.PREPARE_INCREMENTAL);
-      LOG.debug("For incremental backup, current table set is "
-          + backupManager.getIncrementalBackupTableSet());
-      newTimestamps =
-          ((IncrementalBackupManager) backupManager).getIncrBackupLogFileMap();
-    } catch (Exception e) {
-      // fail the overall backup and return
-      failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ",
-        BackupType.INCREMENTAL, conf);
-      throw new IOException(e);
-    }
-
-    // case INCREMENTAL_COPY:
-    try {
-      // copy out the table and region info files for each table
-      BackupUtils.copyTableRegionInfo(conn, backupInfo, conf);
-      // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT
-      convertWALsToHFiles();
-      incrementalCopyHFiles(new String[] {getBulkOutputDir().toString()}, backupInfo.getBackupRootDir());
-      // Save list of WAL files copied
-      backupManager.recordWALFiles(backupInfo.getIncrBackupFileList());
-    } catch (Exception e) {
-      String msg = "Unexpected exception in incremental-backup: incremental copy " + backupId;
-      // fail the overall backup and return
-      failBackup(conn, backupInfo, backupManager, e, msg, BackupType.INCREMENTAL, conf);
-      throw new IOException(e);
-    }
-    // case INCR_BACKUP_COMPLETE:
-    // set overall backup status: complete. Here we make sure to complete the backup.
-    // After this checkpoint, even if entering cancel process, will let the backup finished
-    try {
-      // Set the previousTimestampMap which is before this current log roll to the manifest.
-      HashMap<TableName, HashMap<String, Long>> previousTimestampMap =
-          backupManager.readLogTimestampMap();
-      backupInfo.setIncrTimestampMap(previousTimestampMap);
-
-      // The table list in backupInfo is good for both full backup and incremental backup.
-      // For incremental backup, it contains the incremental backup table set.
-      backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps);
-
-      HashMap<TableName, HashMap<String, Long>> newTableSetTimestampMap =
-          backupManager.readLogTimestampMap();
-
-      Long newStartCode =
-          BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap));
-      backupManager.writeBackupStartCode(newStartCode);
-
-      handleBulkLoad(backupInfo.getTableNames());
-      // backup complete
-      completeBackup(conn, backupInfo, backupManager, BackupType.INCREMENTAL, conf);
-
-    } catch (IOException e) {
-      failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ",
-        BackupType.INCREMENTAL, conf);
-      throw new IOException(e);
-    }
-  }
-
-  protected void incrementalCopyHFiles(String[] files, String backupDest) throws IOException {
-
-    try {
-      LOG.debug("Incremental copy HFiles is starting. dest="+backupDest);
-      // set overall backup phase: incremental_copy
-      backupInfo.setPhase(BackupPhase.INCREMENTAL_COPY);
-      // get incremental backup file list and prepare parms for DistCp
-      String[] strArr = new String[files.length + 1];
-      System.arraycopy(files, 0, strArr, 0, files.length);
-      strArr[strArr.length - 1] = backupDest;
-
-      String jobname = "Incremental_Backup-HFileCopy-" + backupInfo.getBackupId();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Setting incremental copy HFiles job name to : " + jobname);
-      }
-      conf.set(JOB_NAME_CONF_KEY, jobname);
-
-      BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf);
-      int res = copyService.copy(backupInfo, backupManager, conf, BackupType.INCREMENTAL, strArr);
-      if (res != 0) {
-        LOG.error("Copy incremental HFile files failed with return code: " + res + ".");
-        throw new IOException("Failed copy from " + StringUtils.join(files, ',')
-            + " to " + backupDest);
-      }
-      LOG.debug("Incremental copy HFiles from " + StringUtils.join(files, ',')
-          + " to " + backupDest + " finished.");
-    } finally {
-      deleteBulkLoadDirectory();
-    }
-  }
-
-  protected void deleteBulkLoadDirectory() throws IOException {
-    // delete original bulk load directory on method exit
-    Path path = getBulkOutputDir();
-    FileSystem fs = FileSystem.get(conf);
-    boolean result = fs.delete(path, true);
-    if (!result) {
-      LOG.warn("Could not delete " + path);
-    }
-
-  }
-
-  protected void convertWALsToHFiles() throws IOException {
-    // get incremental backup file list and prepare parameters for DistCp
-    List<String> incrBackupFileList = backupInfo.getIncrBackupFileList();
-    // Get list of tables in incremental backup set
-    Set<TableName> tableSet = backupManager.getIncrementalBackupTableSet();
-    // filter missing files out (they have been copied by previous backups)
-    incrBackupFileList = filterMissingFiles(incrBackupFileList);
-    for (TableName table : tableSet) {
-      // Check if table exists
-      if (tableExists(table, conn)) {
-        walToHFiles(incrBackupFileList, table);
-      } else {
-        LOG.warn("Table " + table + " does not exists. Skipping in WAL converter");
-      }
-    }
-  }
-
-
-  protected boolean tableExists(TableName table, Connection conn) throws IOException {
-    try (Admin admin = conn.getAdmin();) {
-      return admin.tableExists(table);
-    }
-  }
-
-  protected void walToHFiles(List<String> dirPaths, TableName tableName) throws IOException {
-
-    Tool player = new WALPlayer();
-
-    // Player reads all files in arbitrary directory structure and creates
-    // a Map task for each file. We use ';' as separator
-    // because WAL file names contains ','
-    String dirs = StringUtils.join(dirPaths, ';');
-    String jobname = "Incremental_Backup-" + backupId + "-" + tableName.getNameAsString();
-
-    Path bulkOutputPath = getBulkOutputDirForTable(tableName);
-    conf.set(WALPlayer.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
-    conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";");
-    conf.set(JOB_NAME_CONF_KEY, jobname);
-    String[] playerArgs = { dirs, tableName.getNameAsString() };
-
-    try {
-      player.setConf(conf);
-      int result = player.run(playerArgs);
-      if(result != 0) {
-        throw new IOException("WAL Player failed");
-      }
-      conf.unset(WALPlayer.INPUT_FILES_SEPARATOR_KEY);
-      conf.unset(JOB_NAME_CONF_KEY);
-    } catch (IOException e) {
-      throw e;
-    } catch (Exception ee) {
-      throw new IOException("Can not convert from directory " + dirs
-          + " (check Hadoop, HBase and WALPlayer M/R job logs) ", ee);
-    }
-  }
-
-  protected Path getBulkOutputDirForTable(TableName table) {
-    Path tablePath = getBulkOutputDir();
-    tablePath = new Path(tablePath, table.getNamespaceAsString());
-    tablePath = new Path(tablePath, table.getQualifierAsString());
-    return new Path(tablePath, "data");
-  }
-
-  protected Path getBulkOutputDir() {
-    String backupId = backupInfo.getBackupId();
-    Path path = new Path(backupInfo.getBackupRootDir());
-    path = new Path(path, ".tmp");
-    path = new Path(path, backupId);
-    return path;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/79ac70ac/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
deleted file mode 100644
index fc0fdde..0000000
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
+++ /dev/null
@@ -1,283 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.backup.impl;
-
-import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.backup.BackupType;
-import org.apache.hadoop.hbase.backup.HBackupFileSystem;
-import org.apache.hadoop.hbase.backup.RestoreRequest;
-import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
-import org.apache.hadoop.hbase.backup.util.BackupUtils;
-import org.apache.hadoop.hbase.backup.util.RestoreTool;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
-import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.LoadQueueItem;
-
-/**
- * Restore table implementation
- *
- */
-@InterfaceAudience.Private
-public class RestoreTablesClient {
-  private static final Log LOG = LogFactory.getLog(RestoreTablesClient.class);
-
-  private Configuration conf;
-  private Connection conn;
-  private String backupId;
-  private TableName[] sTableArray;
-  private TableName[] tTableArray;
-  private String targetRootDir;
-  private boolean isOverwrite;
-
-  public RestoreTablesClient(Connection conn, RestoreRequest request) throws IOException {
-    this.targetRootDir = request.getBackupRootDir();
-    this.backupId = request.getBackupId();
-    this.sTableArray = request.getFromTables();
-    this.tTableArray = request.getToTables();
-    if (tTableArray == null || tTableArray.length == 0) {
-      this.tTableArray = sTableArray;
-    }
-    this.isOverwrite = request.isOverwrite();
-    this.conn = conn;
-    this.conf = conn.getConfiguration();
-
-  }
-
-  /**
-   * Validate target tables
-   * @param conn connection
-   * @param mgr table state manager
-   * @param tTableArray: target tables
-   * @param isOverwrite overwrite existing table
-   * @throws IOException exception
-   */
-  private void checkTargetTables(TableName[] tTableArray, boolean isOverwrite) throws IOException {
-    ArrayList<TableName> existTableList = new ArrayList<>();
-    ArrayList<TableName> disabledTableList = new ArrayList<>();
-
-    // check if the tables already exist
-    try (Admin admin = conn.getAdmin();) {
-      for (TableName tableName : tTableArray) {
-        if (admin.tableExists(tableName)) {
-          existTableList.add(tableName);
-          if (admin.isTableDisabled(tableName)) {
-            disabledTableList.add(tableName);
-          }
-        } else {
-          LOG.info("HBase table " + tableName
-              + " does not exist. It will be created during restore process");
-        }
-      }
-    }
-
-    if (existTableList.size() > 0) {
-      if (!isOverwrite) {
-        LOG.error("Existing table (" + existTableList
-            + ") found in the restore target, please add "
-            + "\"-o\" as overwrite option in the command if you mean"
-            + " to restore to these existing tables");
-        throw new IOException("Existing table found in target while no \"-o\" "
-            + "as overwrite option found");
-      } else {
-        if (disabledTableList.size() > 0) {
-          LOG.error("Found offline table in the restore target, "
-              + "please enable them before restore with \"-overwrite\" option");
-          LOG.info("Offline table list in restore target: " + disabledTableList);
-          throw new IOException(
-              "Found offline table in the target when restore with \"-overwrite\" option");
-        }
-      }
-    }
-  }
-
-  /**
-   * Restore operation handle each backupImage in array
-   * @param svc: master services
-   * @param images: array BackupImage
-   * @param sTable: table to be restored
-   * @param tTable: table to be restored to
-   * @param truncateIfExists: truncate table
-   * @throws IOException exception
-   */
-
-  private void restoreImages(BackupImage[] images, TableName sTable, TableName tTable,
-      boolean truncateIfExists) throws IOException {
-
-    // First image MUST be image of a FULL backup
-    BackupImage image = images[0];
-    String rootDir = image.getRootDir();
-    String backupId = image.getBackupId();
-    Path backupRoot = new Path(rootDir);
-    RestoreTool restoreTool = new RestoreTool(conf, backupRoot, backupId);
-    Path tableBackupPath = HBackupFileSystem.getTableBackupPath(sTable, backupRoot, backupId);
-    String lastIncrBackupId = images.length == 1 ? null : images[images.length - 1].getBackupId();
-    // We need hFS only for full restore (see the code)
-    BackupManifest manifest = HBackupFileSystem.getManifest(conf, backupRoot, backupId);
-    if (manifest.getType() == BackupType.FULL) {
-      LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from full" + " backup image "
-          + tableBackupPath.toString());
-      conf.set(JOB_NAME_CONF_KEY, "Full_Restore-" + backupId + "-" + tTable);
-      restoreTool.fullRestoreTable(conn, tableBackupPath, sTable, tTable, truncateIfExists,
-        lastIncrBackupId);
-      conf.unset(JOB_NAME_CONF_KEY);
-    } else { // incremental Backup
-      throw new IOException("Unexpected backup type " + image.getType());
-    }
-
-    if (images.length == 1) {
-      // full backup restore done
-      return;
-    }
-
-    List<Path> dirList = new ArrayList<Path>();
-    // add full backup path
-    // full backup path comes first
-    for (int i = 1; i < images.length; i++) {
-      BackupImage im = images[i];
-      String fileBackupDir =
-          HBackupFileSystem.getTableBackupDataDir(im.getRootDir(), im.getBackupId(), sTable);
-      dirList.add(new Path(fileBackupDir));
-    }
-
-    String dirs = StringUtils.join(dirList, ",");
-    LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from log dirs: " + dirs);
-    Path[] paths = new Path[dirList.size()];
-    dirList.toArray(paths);
-    conf.set(JOB_NAME_CONF_KEY, "Incremental_Restore-" + backupId + "-" + tTable);
-    restoreTool.incrementalRestoreTable(conn, tableBackupPath, paths, new TableName[] { sTable },
-      new TableName[] { tTable }, lastIncrBackupId);
-    LOG.info(sTable + " has been successfully restored to " + tTable);
-  }
-
-  /**
-   * Restore operation. Stage 2: resolved Backup Image dependency
-   * @param backupManifestMap : tableName, Manifest
-   * @param sTableArray The array of tables to be restored
-   * @param tTableArray The array of mapping tables to restore to
-   * @return set of BackupImages restored
-   * @throws IOException exception
-   */
-  private void restore(HashMap<TableName, BackupManifest> backupManifestMap,
-      TableName[] sTableArray, TableName[] tTableArray, boolean isOverwrite) throws IOException {
-    TreeSet<BackupImage> restoreImageSet = new TreeSet<BackupImage>();
-    boolean truncateIfExists = isOverwrite;
-    Set<String> backupIdSet = new HashSet<>();
-
-    for (int i = 0; i < sTableArray.length; i++) {
-      TableName table = sTableArray[i];
-
-      BackupManifest manifest = backupManifestMap.get(table);
-      // Get the image list of this backup for restore in time order from old
-      // to new.
-      List<BackupImage> list = new ArrayList<BackupImage>();
-      list.add(manifest.getBackupImage());
-      TreeSet<BackupImage> set = new TreeSet<BackupImage>(list);
-      List<BackupImage> depList = manifest.getDependentListByTable(table);
-      set.addAll(depList);
-      BackupImage[] arr = new BackupImage[set.size()];
-      set.toArray(arr);
-      restoreImages(arr, table, tTableArray[i], truncateIfExists);
-      restoreImageSet.addAll(list);
-      if (restoreImageSet != null && !restoreImageSet.isEmpty()) {
-        LOG.info("Restore includes the following image(s):");
-        for (BackupImage image : restoreImageSet) {
-          LOG.info("Backup: " + image.getBackupId() + " "
-              + HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(), table));
-          if (image.getType() == BackupType.INCREMENTAL) {
-            backupIdSet.add(image.getBackupId());
-            LOG.debug("adding " + image.getBackupId() + " for bulk load");
-          }
-        }
-      }
-    }
-    try (BackupSystemTable table = new BackupSystemTable(conn)) {
-      List<TableName> sTableList = Arrays.asList(sTableArray);
-      for (String id : backupIdSet) {
-        LOG.debug("restoring bulk load for " + id);
-        Map<byte[], List<Path>>[] mapForSrc = table.readBulkLoadedFiles(id, sTableList);
-        Map<LoadQueueItem, ByteBuffer> loaderResult;
-        conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true);
-        LoadIncrementalHFiles loader = BackupUtils.createLoader(conf);
-        for (int i = 0; i < sTableList.size(); i++) {
-          if (mapForSrc[i] != null && !mapForSrc[i].isEmpty()) {
-            loaderResult = loader.run(mapForSrc[i], tTableArray[i]);
-            LOG.debug("bulk loading " + sTableList.get(i) + " to " + tTableArray[i]);
-            if (loaderResult.isEmpty()) {
-              String msg = "Couldn't bulk load for " + sTableList.get(i) + " to " + tTableArray[i];
-              LOG.error(msg);
-              throw new IOException(msg);
-            }
-          }
-        }
-      }
-    }
-    LOG.debug("restoreStage finished");
-  }
-
-  static long getTsFromBackupId(String backupId) {
-    if (backupId == null) {
-      return 0;
-    }
-    return Long.parseLong(backupId.substring(backupId.lastIndexOf("_") + 1));
-  }
-
-  static boolean withinRange(long a, long lower, long upper) {
-    if (a < lower || a > upper) {
-      return false;
-    }
-    return true;
-  }
-
-  public void execute() throws IOException {
-
-    // case VALIDATION:
-    // check the target tables
-    checkTargetTables(tTableArray, isOverwrite);
-
-    // case RESTORE_IMAGES:
-    HashMap<TableName, BackupManifest> backupManifestMap = new HashMap<>();
-    // check and load backup image manifest for the tables
-    Path rootPath = new Path(targetRootDir);
-    HBackupFileSystem.checkImageManifestExist(backupManifestMap, sTableArray, conf, rootPath,
-      backupId);
-
-    restore(backupManifestMap, sTableArray, tTableArray, isOverwrite);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/79ac70ac/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java
deleted file mode 100644
index aa0ec5f..0000000
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java
+++ /dev/null
@@ -1,436 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.backup.impl;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.backup.BackupInfo;
-import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase;
-import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
-import org.apache.hadoop.hbase.backup.BackupRequest;
-import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
-import org.apache.hadoop.hbase.backup.BackupType;
-import org.apache.hadoop.hbase.backup.HBackupFileSystem;
-import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.FSUtils;
-
-import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
-
-/**
- * Base class for backup operation. Concrete implementation for
- * full and incremental backup are delegated to corresponding sub-classes:
- * {@link FullTableBackupClient} and {@link IncrementalTableBackupClient}
- *
- */
-@InterfaceAudience.Private
-public abstract class TableBackupClient {
-
-  public static final String BACKUP_CLIENT_IMPL_CLASS = "backup.client.impl.class";
-
-  @VisibleForTesting
-  public static final String BACKUP_TEST_MODE_STAGE = "backup.test.mode.stage";
-
-  private static final Log LOG = LogFactory.getLog(TableBackupClient.class);
-
-  protected Configuration conf;
-  protected Connection conn;
-  protected String backupId;
-  protected List<TableName> tableList;
-  protected HashMap<String, Long> newTimestamps = null;
-
-  protected BackupManager backupManager;
-  protected BackupInfo backupInfo;
-
-  public TableBackupClient() {
-  }
-
-  public TableBackupClient(final Connection conn, final String backupId, BackupRequest request)
-      throws IOException {
-    init(conn, backupId, request);
-  }
-
-  public void init(final Connection conn, final String backupId, BackupRequest request)
-      throws IOException
-  {
-    if (request.getBackupType() == BackupType.FULL) {
-      backupManager = new BackupManager(conn, conn.getConfiguration());
-    } else {
-      backupManager = new IncrementalBackupManager(conn, conn.getConfiguration());
-    }
-    this.backupId = backupId;
-    this.tableList = request.getTableList();
-    this.conn = conn;
-    this.conf = conn.getConfiguration();
-    backupInfo =
-        backupManager.createBackupInfo(backupId, request.getBackupType(), tableList,
-          request.getTargetRootDir(), request.getTotalTasks(), request.getBandwidth());
-    if (tableList == null || tableList.isEmpty()) {
-      this.tableList = new ArrayList<>(backupInfo.getTables());
-    }
-    // Start new session
-    backupManager.startBackupSession();
-  }
-
-  /**
-   * Begin the overall backup.
-   * @param backupInfo backup info
-   * @throws IOException exception
-   */
-  protected void beginBackup(BackupManager backupManager, BackupInfo backupInfo)
-      throws IOException {
-
-    BackupSystemTable.snapshot(conn);
-    backupManager.setBackupInfo(backupInfo);
-    // set the start timestamp of the overall backup
-    long startTs = EnvironmentEdgeManager.currentTime();
-    backupInfo.setStartTs(startTs);
-    // set overall backup status: ongoing
-    backupInfo.setState(BackupState.RUNNING);
-    backupInfo.setPhase(BackupPhase.REQUEST);
-    LOG.info("Backup " + backupInfo.getBackupId() + " started at " + startTs + ".");
-
-    backupManager.updateBackupInfo(backupInfo);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Backup session " + backupInfo.getBackupId() + " has been started.");
-    }
-  }
-
-  protected String getMessage(Exception e) {
-    String msg = e.getMessage();
-    if (msg == null || msg.equals("")) {
-      msg = e.getClass().getName();
-    }
-    return msg;
-  }
-
-  /**
-   * Delete HBase snapshot for backup.
-   * @param backupInfo backup info
-   * @throws Exception exception
-   */
-  protected static void deleteSnapshots(final Connection conn, BackupInfo backupInfo, Configuration conf)
-      throws IOException {
-    LOG.debug("Trying to delete snapshot for full backup.");
-    for (String snapshotName : backupInfo.getSnapshotNames()) {
-      if (snapshotName == null) {
-        continue;
-      }
-      LOG.debug("Trying to delete snapshot: " + snapshotName);
-
-      try (Admin admin = conn.getAdmin();) {
-        admin.deleteSnapshot(snapshotName);
-      }
-      LOG.debug("Deleting the snapshot " + snapshotName + " for backup " + backupInfo.getBackupId()
-          + " succeeded.");
-    }
-  }
-
-  /**
-   * Clean up directories with prefix "exportSnapshot-", which are generated when exporting
-   * snapshots.
-   * @throws IOException exception
-   */
-  protected static void cleanupExportSnapshotLog(Configuration conf) throws IOException {
-    FileSystem fs = FSUtils.getCurrentFileSystem(conf);
-    Path stagingDir =
-        new Path(conf.get(BackupRestoreConstants.CONF_STAGING_ROOT, fs.getWorkingDirectory()
-            .toString()));
-    FileStatus[] files = FSUtils.listStatus(fs, stagingDir);
-    if (files == null) {
-      return;
-    }
-    for (FileStatus file : files) {
-      if (file.getPath().getName().startsWith("exportSnapshot-")) {
-        LOG.debug("Delete log files of exporting snapshot: " + file.getPath().getName());
-        if (FSUtils.delete(fs, file.getPath(), true) == false) {
-          LOG.warn("Can not delete " + file.getPath());
-        }
-      }
-    }
-  }
-
-  /**
-   * Clean up the uncompleted data at target directory if the ongoing backup has already entered
-   * the copy phase.
-   */
-  protected static void cleanupTargetDir(BackupInfo backupInfo, Configuration conf) {
-    try {
-      // clean up the uncompleted data at target directory if the ongoing backup has already entered
-      // the copy phase
-      LOG.debug("Trying to cleanup up target dir. Current backup phase: "
-          + backupInfo.getPhase());
-      if (backupInfo.getPhase().equals(BackupPhase.SNAPSHOTCOPY)
-          || backupInfo.getPhase().equals(BackupPhase.INCREMENTAL_COPY)
-          || backupInfo.getPhase().equals(BackupPhase.STORE_MANIFEST)) {
-        FileSystem outputFs =
-            FileSystem.get(new Path(backupInfo.getBackupRootDir()).toUri(), conf);
-
-        // now treat one backup as a transaction, clean up data that has been partially copied at
-        // table level
-        for (TableName table : backupInfo.getTables()) {
-          Path targetDirPath =
-              new Path(HBackupFileSystem.getTableBackupDir(backupInfo.getBackupRootDir(),
-                backupInfo.getBackupId(), table));
-          if (outputFs.delete(targetDirPath, true)) {
-            LOG.debug("Cleaning up uncompleted backup data at " + targetDirPath.toString()
-                + " done.");
-          } else {
-            LOG.debug("No data has been copied to " + targetDirPath.toString() + ".");
-          }
-
-          Path tableDir = targetDirPath.getParent();
-          FileStatus[] backups = FSUtils.listStatus(outputFs, tableDir);
-          if (backups == null || backups.length == 0) {
-            outputFs.delete(tableDir, true);
-            LOG.debug(tableDir.toString() + " is empty, remove it.");
-          }
-        }
-      }
-
-    } catch (IOException e1) {
-      LOG.error("Cleaning up uncompleted backup data of " + backupInfo.getBackupId() + " at "
-          + backupInfo.getBackupRootDir() + " failed due to " + e1.getMessage() + ".");
-    }
-  }
-
-  /**
-   * Fail the overall backup.
-   * @param backupInfo backup info
-   * @param e exception
-   * @throws Exception exception
-   */
-  protected void failBackup(Connection conn, BackupInfo backupInfo, BackupManager backupManager,
-      Exception e, String msg, BackupType type, Configuration conf) throws IOException {
-
-    try {
-      LOG.error(msg + getMessage(e), e);
-      // If this is a cancel exception, then we've already cleaned.
-      // set the failure timestamp of the overall backup
-      backupInfo.setCompleteTs(EnvironmentEdgeManager.currentTime());
-      // set failure message
-      backupInfo.setFailedMsg(e.getMessage());
-      // set overall backup status: failed
-      backupInfo.setState(BackupState.FAILED);
-      // compose the backup failed data
-      String backupFailedData =
-          "BackupId=" + backupInfo.getBackupId() + ",startts=" + backupInfo.getStartTs()
-              + ",failedts=" + backupInfo.getCompleteTs() + ",failedphase=" + backupInfo.getPhase()
-              + ",failedmessage=" + backupInfo.getFailedMsg();
-      LOG.error(backupFailedData);
-      cleanupAndRestoreBackupSystem(conn, backupInfo, conf);
-      // If backup session is updated to FAILED state - means we
-      // processed recovery already.
-      backupManager.updateBackupInfo(backupInfo);
-      backupManager.finishBackupSession();
-      LOG.error("Backup " + backupInfo.getBackupId() + " failed.");
-    } catch (IOException ee) {
-      LOG.error("Please run backup repair tool manually to restore backup system integrity");
-      throw ee;
-    }
-  }
-
-  public static void cleanupAndRestoreBackupSystem (Connection conn, BackupInfo backupInfo,
-      Configuration conf) throws IOException
-  {
-    BackupType type = backupInfo.getType();
-     // if full backup, then delete HBase snapshots if there already are snapshots taken
-     // and also clean up export snapshot log files if exist
-     if (type == BackupType.FULL) {
-       deleteSnapshots(conn, backupInfo, conf);
-       cleanupExportSnapshotLog(conf);
-     }
-     BackupSystemTable.restoreFromSnapshot(conn);
-     BackupSystemTable.deleteSnapshot(conn);
-     // clean up the uncompleted data at target directory if the ongoing backup has already entered
-     // the copy phase
-     // For incremental backup, DistCp logs will be cleaned with the targetDir.
-     cleanupTargetDir(backupInfo, conf);
-  }
-
-
-
-  /**
-   * Add manifest for the current backup. The manifest is stored within the table backup directory.
-   * @param backupInfo The current backup info
-   * @throws IOException exception
-   * @throws BackupException exception
-   */
-  protected void addManifest(BackupInfo backupInfo, BackupManager backupManager, BackupType type,
-      Configuration conf) throws IOException, BackupException {
-    // set the overall backup phase : store manifest
-    backupInfo.setPhase(BackupPhase.STORE_MANIFEST);
-
-    BackupManifest manifest;
-
-    // Since we have each table's backup in its own directory structure,
-    // we'll store its manifest with the table directory.
-    for (TableName table : backupInfo.getTables()) {
-      manifest = new BackupManifest(backupInfo, table);
-      ArrayList<BackupImage> ancestors = backupManager.getAncestors(backupInfo, table);
-      for (BackupImage image : ancestors) {
-        manifest.addDependentImage(image);
-      }
-
-      if (type == BackupType.INCREMENTAL) {
-        // We'll store the log timestamps for this table only in its manifest.
-        HashMap<TableName, HashMap<String, Long>> tableTimestampMap =
-            new HashMap<TableName, HashMap<String, Long>>();
-        tableTimestampMap.put(table, backupInfo.getIncrTimestampMap().get(table));
-        manifest.setIncrTimestampMap(tableTimestampMap);
-        ArrayList<BackupImage> ancestorss = backupManager.getAncestors(backupInfo);
-        for (BackupImage image : ancestorss) {
-          manifest.addDependentImage(image);
-        }
-      }
-      manifest.store(conf);
-    }
-
-    // For incremental backup, we store a overall manifest in
-    // <backup-root-dir>/WALs/<backup-id>
-    // This is used when created the next incremental backup
-    if (type == BackupType.INCREMENTAL) {
-      manifest = new BackupManifest(backupInfo);
-      // set the table region server start and end timestamps for incremental backup
-      manifest.setIncrTimestampMap(backupInfo.getIncrTimestampMap());
-      ArrayList<BackupImage> ancestors = backupManager.getAncestors(backupInfo);
-      for (BackupImage image : ancestors) {
-        manifest.addDependentImage(image);
-      }
-      manifest.store(conf);
-    }
-  }
-
-  /**
-   * Get backup request meta data dir as string.
-   * @param backupInfo backup info
-   * @return meta data dir
-   */
-  protected String obtainBackupMetaDataStr(BackupInfo backupInfo) {
-    StringBuffer sb = new StringBuffer();
-    sb.append("type=" + backupInfo.getType() + ",tablelist=");
-    for (TableName table : backupInfo.getTables()) {
-      sb.append(table + ";");
-    }
-    if (sb.lastIndexOf(";") > 0) {
-      sb.delete(sb.lastIndexOf(";"), sb.lastIndexOf(";") + 1);
-    }
-    sb.append(",targetRootDir=" + backupInfo.getBackupRootDir());
-
-    return sb.toString();
-  }
-
-  /**
-   * Clean up directories with prefix "_distcp_logs-", which are generated when DistCp copying
-   * hlogs.
-   * @throws IOException exception
-   */
-  protected void cleanupDistCpLog(BackupInfo backupInfo, Configuration conf) throws IOException {
-    Path rootPath = new Path(backupInfo.getHLogTargetDir()).getParent();
-    FileSystem fs = FileSystem.get(rootPath.toUri(), conf);
-    FileStatus[] files = FSUtils.listStatus(fs, rootPath);
-    if (files == null) {
-      return;
-    }
-    for (FileStatus file : files) {
-      if (file.getPath().getName().startsWith("_distcp_logs")) {
-        LOG.debug("Delete log files of DistCp: " + file.getPath().getName());
-        FSUtils.delete(fs, file.getPath(), true);
-      }
-    }
-  }
-
-  /**
-   * Complete the overall backup.
-   * @param backupInfo backup info
-   * @throws Exception exception
-   */
-  protected void completeBackup(final Connection conn, BackupInfo backupInfo,
-      BackupManager backupManager, BackupType type, Configuration conf) throws IOException {
-    // set the complete timestamp of the overall backup
-    backupInfo.setCompleteTs(EnvironmentEdgeManager.currentTime());
-    // set overall backup status: complete
-    backupInfo.setState(BackupState.COMPLETE);
-    backupInfo.setProgress(100);
-    // add and store the manifest for the backup
-    addManifest(backupInfo, backupManager, type, conf);
-
-    // compose the backup complete data
-    String backupCompleteData =
-        obtainBackupMetaDataStr(backupInfo) + ",startts=" + backupInfo.getStartTs()
-            + ",completets=" + backupInfo.getCompleteTs() + ",bytescopied="
-            + backupInfo.getTotalBytesCopied();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Backup " + backupInfo.getBackupId() + " finished: " + backupCompleteData);
-    }
-
-    // when full backup is done:
-    // - delete HBase snapshot
-    // - clean up directories with prefix "exportSnapshot-", which are generated when exporting
-    // snapshots
-    if (type == BackupType.FULL) {
-      deleteSnapshots(conn, backupInfo, conf);
-      cleanupExportSnapshotLog(conf);
-    } else if (type == BackupType.INCREMENTAL) {
-      cleanupDistCpLog(backupInfo, conf);
-    }
-    BackupSystemTable.deleteSnapshot(conn);
-    backupManager.updateBackupInfo(backupInfo);
-
-    // Finish active session
-    backupManager.finishBackupSession();
-
-    LOG.info("Backup " + backupInfo.getBackupId() + " completed.");
-  }
-
-  /**
-   * Backup request execution
-   * @throws IOException
-   */
-  public abstract void execute() throws IOException;
-
-  @VisibleForTesting
-  protected Stage getTestStage() {
-    return Stage.valueOf("stage_"+ conf.getInt(BACKUP_TEST_MODE_STAGE, 0));
-  }
-
-  @VisibleForTesting
-  protected void failStageIf(Stage stage) throws IOException {
-    Stage current = getTestStage();
-    if (current == stage) {
-      throw new IOException("Failed stage " + stage+" in testing");
-    }
-  }
-
-  public static enum Stage {
-    stage_0, stage_1, stage_2, stage_3, stage_4
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/79ac70ac/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyJob.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyJob.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyJob.java
deleted file mode 100644
index 90e8442..0000000
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyJob.java
+++ /dev/null
@@ -1,438 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.backup.mapreduce;
-
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.math.BigDecimal;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.backup.BackupCopyJob;
-import org.apache.hadoop.hbase.backup.BackupInfo;
-import org.apache.hadoop.hbase.backup.BackupType;
-import org.apache.hadoop.hbase.backup.impl.BackupManager;
-import org.apache.hadoop.hbase.backup.util.BackupUtils;
-import org.apache.hadoop.hbase.snapshot.ExportSnapshot;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Cluster;
-import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.tools.CopyListingFileStatus;
-import org.apache.hadoop.tools.DistCp;
-import org.apache.hadoop.tools.DistCpConstants;
-import org.apache.hadoop.tools.DistCpOptions;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-
-/**
- * Map-Reduce implementation of {@link BackupCopyJob}. Basically, there are 2 types of copy
- * operation: one is copying from snapshot, which bases on extending ExportSnapshot's function, the
- * other is copying for incremental log files, which bases on extending DistCp's function.
- */
-@InterfaceAudience.Private
-public class MapReduceBackupCopyJob implements BackupCopyJob {
-  public static final String NUMBER_OF_LEVELS_TO_PRESERVE_KEY = "num.levels.preserve";
-  private static final Log LOG = LogFactory.getLog(MapReduceBackupCopyJob.class);
-
-  private Configuration conf;
-
-  // Accumulated progress within the whole backup process for the copy operation
-  private float progressDone = 0.1f;
-  private long bytesCopied = 0;
-  private static float INIT_PROGRESS = 0.1f;
-
-  // The percentage of the current copy task within the whole task if multiple time copies are
-  // needed. The default value is 100%, which means only 1 copy task for the whole.
-  private float subTaskPercntgInWholeTask = 1f;
-
-  public MapReduceBackupCopyJob() {
-  }
-
-  @Override
-  public Configuration getConf() {
-    return conf;
-  }
-
-  @Override
-  public void setConf(Configuration conf) {
-    this.conf = conf;
-  }
-
-  /**
-   * Get the current copy task percentage within the whole task if multiple copies are needed.
-   * @return the current copy task percentage
-   */
-  public float getSubTaskPercntgInWholeTask() {
-    return subTaskPercntgInWholeTask;
-  }
-
-  /**
-   * Set the current copy task percentage within the whole task if multiple copies are needed. Must
-   * be called before calling
-   * {@link #copy(BackupInfo, BackupManager, Configuration, BackupType, String[])}
-   * @param subTaskPercntgInWholeTask The percentage of the copy subtask
-   */
-  public void setSubTaskPercntgInWholeTask(float subTaskPercntgInWholeTask) {
-    this.subTaskPercntgInWholeTask = subTaskPercntgInWholeTask;
-  }
-
-  static class SnapshotCopy extends ExportSnapshot {
-    private BackupInfo backupInfo;
-    private TableName table;
-
-    public SnapshotCopy(BackupInfo backupInfo, TableName table) {
-      super();
-      this.backupInfo = backupInfo;
-      this.table = table;
-    }
-
-    public TableName getTable() {
-      return this.table;
-    }
-
-    public BackupInfo getBackupInfo() {
-      return this.backupInfo;
-    }
-  }
-
-  /**
-   * Update the ongoing backup with new progress.
-   * @param backupInfo backup info
-   * @param newProgress progress
-   * @param bytesCopied bytes copied
-   * @throws NoNodeException exception
-   */
-  static void updateProgress(BackupInfo backupInfo, BackupManager backupManager,
-      int newProgress, long bytesCopied) throws IOException {
-    // compose the new backup progress data, using fake number for now
-    String backupProgressData = newProgress + "%";
-
-    backupInfo.setProgress(newProgress);
-    backupManager.updateBackupInfo(backupInfo);
-    LOG.debug("Backup progress data \"" + backupProgressData
-        + "\" has been updated to backup system table for " + backupInfo.getBackupId());
-  }
-
-  /**
-   * Extends DistCp for progress updating to backup system table
-   * during backup. Using DistCpV2 (MAPREDUCE-2765).
-   * Simply extend it and override execute() method to get the
-   * Job reference for progress updating.
-   * Only the argument "src1, [src2, [...]] dst" is supported,
-   * no more DistCp options.
-   */
-
-  class BackupDistCp extends DistCp {
-
-    private BackupInfo backupInfo;
-    private BackupManager backupManager;
-
-    public BackupDistCp(Configuration conf, DistCpOptions options, BackupInfo backupInfo,
-        BackupManager backupManager) throws Exception {
-      super(conf, options);
-      this.backupInfo = backupInfo;
-      this.backupManager = backupManager;
-    }
-
-
-
-    @Override
-    public Job execute() throws Exception {
-
-      // reflection preparation for private methods and fields
-      Class<?> classDistCp = org.apache.hadoop.tools.DistCp.class;
-      Method methodCleanup = classDistCp.getDeclaredMethod("cleanup");
-
-      Field fieldInputOptions = getInputOptionsField(classDistCp);
-      Field fieldSubmitted = classDistCp.getDeclaredField("submitted");
-
-      methodCleanup.setAccessible(true);
-      fieldInputOptions.setAccessible(true);
-      fieldSubmitted.setAccessible(true);
-
-      // execute() logic starts here
-      assert fieldInputOptions.get(this) != null;
-
-      Job job = null;
-      try {
-
-        List<Path> srcs = getSourcePaths(fieldInputOptions);
-
-        long totalSrcLgth = 0;
-        for (Path aSrc : srcs) {
-          totalSrcLgth +=
-              BackupUtils.getFilesLength(aSrc.getFileSystem(super.getConf()), aSrc);
-        }
-
-        // Async call
-        job = super.execute();
-        // Update the copy progress to system table every 0.5s if progress value changed
-        int progressReportFreq =
-            MapReduceBackupCopyJob.this.getConf().getInt("hbase.backup.progressreport.frequency",
-              500);
-        float lastProgress = progressDone;
-        while (!job.isComplete()) {
-          float newProgress =
-              progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS);
-
-          if (newProgress > lastProgress) {
-
-            BigDecimal progressData =
-                new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP);
-            String newProgressStr = progressData + "%";
-            LOG.info("Progress: " + newProgressStr);
-            updateProgress(backupInfo, backupManager, progressData.intValue(), bytesCopied);
-            LOG.debug("Backup progress data updated to backup system table: \"Progress: "
-                + newProgressStr + ".\"");
-            lastProgress = newProgress;
-          }
-          Thread.sleep(progressReportFreq);
-        }
-        // update the progress data after copy job complete
-        float newProgress =
-            progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS);
-        BigDecimal progressData =
-            new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP);
-
-        String newProgressStr = progressData + "%";
-        LOG.info("Progress: " + newProgressStr + " subTask: " + subTaskPercntgInWholeTask
-            + " mapProgress: " + job.mapProgress());
-
-        // accumulate the overall backup progress
-        progressDone = newProgress;
-        bytesCopied += totalSrcLgth;
-
-        updateProgress(backupInfo, backupManager, progressData.intValue(), bytesCopied);
-        LOG.debug("Backup progress data updated to backup system table: \"Progress: "
-            + newProgressStr + " - " + bytesCopied + " bytes copied.\"");
-      } catch (Throwable t) {
-        LOG.error(t);
-        throw t;
-      }
-
-      String jobID = job.getJobID().toString();
-      job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID);
-
-      LOG.debug("DistCp job-id: " + jobID + " completed: " + job.isComplete() + " "
-          + job.isSuccessful());
-      Counters ctrs = job.getCounters();
-      LOG.debug(ctrs);
-      if (job.isComplete() && !job.isSuccessful()) {
-        throw new Exception("DistCp job-id: " + jobID + " failed");
-      }
-
-      return job;
-    }
-
-    private Field getInputOptionsField(Class<?> classDistCp) throws IOException{
-      Field f = null;
-      try {
-        f = classDistCp.getDeclaredField("inputOptions");
-      } catch(Exception e) {
-        // Haddop 3
-        try {
-          f = classDistCp.getDeclaredField("context");
-        } catch (NoSuchFieldException | SecurityException e1) {
-          throw new IOException(e1);
-        }
-      }
-      return f;
-    }
-
-    @SuppressWarnings("unchecked")
-    private List<Path> getSourcePaths(Field fieldInputOptions) throws IOException{
-      Object options;
-      try {
-        options = fieldInputOptions.get(this);
-        if (options instanceof DistCpOptions) {
-          return ((DistCpOptions) options).getSourcePaths();
-        } else {
-          // Hadoop 3
-          Class<?> classContext = Class.forName("org.apache.hadoop.tools.DistCpContext");
-          Method methodGetSourcePaths = classContext.getDeclaredMethod("getSourcePaths");
-          methodGetSourcePaths.setAccessible(true);
-
-          return (List<Path>) methodGetSourcePaths.invoke(options);
-        }
-      } catch (IllegalArgumentException | IllegalAccessException |
-                ClassNotFoundException | NoSuchMethodException |
-                SecurityException | InvocationTargetException e) {
-        throw new IOException(e);
-      }
-
-    }
-
-    @Override
-    protected Path createInputFileListing(Job job) throws IOException {
-
-      if (conf.get(NUMBER_OF_LEVELS_TO_PRESERVE_KEY) == null) {
-        return super.createInputFileListing(job);
-      }
-      long totalBytesExpected = 0;
-      int totalRecords = 0;
-      Path fileListingPath = getFileListingPath();
-      try (SequenceFile.Writer writer = getWriter(fileListingPath);) {
-        List<Path> srcFiles = getSourceFiles();
-        if (srcFiles.size() == 0) {
-          return fileListingPath;
-        }
-        totalRecords = srcFiles.size();
-        FileSystem fs = srcFiles.get(0).getFileSystem(conf);
-        for (Path path : srcFiles) {
-          FileStatus fst = fs.getFileStatus(path);
-          totalBytesExpected += fst.getLen();
-          Text key = getKey(path);
-          writer.append(key, new CopyListingFileStatus(fst));
-        }
-        writer.close();
-
-        // update jobs configuration
-
-        Configuration cfg = job.getConfiguration();
-        cfg.setLong(DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED, totalBytesExpected);
-        cfg.set(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, fileListingPath.toString());
-        cfg.setLong(DistCpConstants.CONF_LABEL_TOTAL_NUMBER_OF_RECORDS, totalRecords);
-      } catch (NoSuchFieldException | SecurityException | IllegalArgumentException
-          | IllegalAccessException | NoSuchMethodException | ClassNotFoundException
-          | InvocationTargetException e) {
-        throw new IOException(e);
-      }
-      return fileListingPath;
-    }
-
-    private Text getKey(Path path) {
-      int level = conf.getInt(NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 1);
-      int count = 0;
-      String relPath = "";
-      while (count++ < level) {
-        relPath = Path.SEPARATOR + path.getName() + relPath;
-        path = path.getParent();
-      }
-      return new Text(relPath);
-    }
-
-    private List<Path> getSourceFiles() throws NoSuchFieldException, SecurityException,
-        IllegalArgumentException, IllegalAccessException, NoSuchMethodException,
-        ClassNotFoundException, InvocationTargetException, IOException {
-      Field options = null;
-      try {
-        options = DistCp.class.getDeclaredField("inputOptions");
-      } catch (NoSuchFieldException | SecurityException e) {
-        options = DistCp.class.getDeclaredField("context");
-      }
-      options.setAccessible(true);
-      return getSourcePaths(options);
-    }
-
-
-
-    private SequenceFile.Writer getWriter(Path pathToListFile) throws IOException {
-      FileSystem fs = pathToListFile.getFileSystem(conf);
-      fs.delete(pathToListFile, false);
-      return SequenceFile.createWriter(conf, SequenceFile.Writer.file(pathToListFile),
-        SequenceFile.Writer.keyClass(Text.class),
-        SequenceFile.Writer.valueClass(CopyListingFileStatus.class),
-        SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE));
-    }
-
-  }
-
-  /**
-   * Do backup copy based on different types.
-   * @param context The backup info
-   * @param conf The hadoop configuration
-   * @param copyType The backup copy type
-   * @param options Options for customized ExportSnapshot or DistCp
-   * @throws Exception exception
-   */
-  @Override
-  public int copy(BackupInfo context, BackupManager backupManager, Configuration conf,
-      BackupType copyType, String[] options) throws IOException {
-    int res = 0;
-
-    try {
-      if (copyType == BackupType.FULL) {
-        SnapshotCopy snapshotCp = new SnapshotCopy(context, context.getTableBySnapshot(options[1]));
-        LOG.debug("Doing SNAPSHOT_COPY");
-        // Make a new instance of conf to be used by the snapshot copy class.
-        snapshotCp.setConf(new Configuration(conf));
-        res = snapshotCp.run(options);
-
-      } else if (copyType == BackupType.INCREMENTAL) {
-        LOG.debug("Doing COPY_TYPE_DISTCP");
-        setSubTaskPercntgInWholeTask(1f);
-
-        BackupDistCp distcp =
-            new BackupDistCp(new Configuration(conf), null, context, backupManager);
-        // Handle a special case where the source file is a single file.
-        // In this case, distcp will not create the target dir. It just take the
-        // target as a file name and copy source file to the target (as a file name).
-        // We need to create the target dir before run distcp.
-        LOG.debug("DistCp options: " + Arrays.toString(options));
-        Path dest = new Path(options[options.length - 1]);
-        String[] newOptions = new String[options.length + 1];
-        System.arraycopy(options, 0, newOptions, 1, options.length);
-        newOptions[0] = "-async"; // run DisCp in async mode
-        FileSystem destfs = dest.getFileSystem(conf);
-        if (!destfs.exists(dest)) {
-          destfs.mkdirs(dest);
-        }
-        res = distcp.run(newOptions);
-      }
-      return res;
-
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-  }
-
-  @Override
-  public void cancel(String jobId) throws IOException {
-    JobID id = JobID.forName(jobId);
-    Cluster cluster = new Cluster(this.getConf());
-    try {
-      Job job = cluster.getJob(id);
-      if (job == null) {
-        LOG.error("No job found for " + id);
-        // should we throw exception
-        return;
-      }
-      if (job.isComplete() || job.isRetired()) {
-        return;
-      }
-
-      job.killJob();
-      LOG.debug("Killed copy job " + id);
-    } catch (InterruptedException e) {
-      throw new IOException(e);
-    }
-  }
-
-}