You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2017/04/05 01:20:22 UTC

hbase git commit: HBASE-14141 HBase Backup/Restore Phase 3: Filter WALs on backup to include only edits from backed up tables (Vladimir Rodionov)

Repository: hbase
Updated Branches:
  refs/heads/master e916b79db -> 910b68082


HBASE-14141 HBase Backup/Restore Phase 3: Filter WALs on backup to include only edits from backed up tables (Vladimir Rodionov)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/910b6808
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/910b6808
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/910b6808

Branch: refs/heads/master
Commit: 910b68082c8f200f0ba6395a76b7ee1c8917e401
Parents: e916b79
Author: tedyu <yu...@gmail.com>
Authored: Tue Apr 4 18:20:11 2017 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Tue Apr 4 18:20:11 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/backup/impl/BackupManager.java |   2 +-
 .../backup/impl/IncrementalBackupManager.java   |  89 ++++++--
 .../impl/IncrementalTableBackupClient.java      | 211 +++++++++++--------
 .../hbase/backup/impl/RestoreTablesClient.java  |   5 +-
 .../hbase/backup/impl/TableBackupClient.java    |   4 -
 .../backup/mapreduce/HFileSplitterJob.java      |   2 +-
 .../backup/mapreduce/MapReduceRestoreJob.java   |  14 +-
 .../hadoop/hbase/backup/util/RestoreTool.java   | 134 ++----------
 .../hadoop/hbase/mapreduce/WALInputFormat.java  | 119 +++++++----
 .../hadoop/hbase/mapreduce/WALPlayer.java       |  10 +-
 .../hadoop/hbase/wal/AbstractFSWALProvider.java | 101 +++++++++
 11 files changed, 410 insertions(+), 281 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/910b6808/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
index c09ce48..f09310f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
@@ -466,7 +466,7 @@ public class BackupManager implements Closeable {
 
   /**
    * Saves list of WAL files after incremental backup operation. These files will be stored until
-   * TTL expiration and are used by Backup Log Cleaner plugin to determine which WAL files can be
+   * TTL expiration and are used by Backup Log Cleaner plug-in to determine which WAL files can be
    * safely purged.
    */
   public void recordWALFiles(List<String> files) throws IOException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/910b6808/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java
index 0f1453e..6330899 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java
@@ -21,8 +21,10 @@ package org.apache.hadoop.hbase.backup.impl;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -33,7 +35,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.backup.BackupInfo;
 import org.apache.hadoop.hbase.backup.impl.BackupSystemTable.WALItem;
 import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
 import org.apache.hadoop.hbase.backup.util.BackupUtils;
@@ -59,12 +60,10 @@ public class IncrementalBackupManager extends BackupManager {
   /**
    * Obtain the list of logs that need to be copied out for this incremental backup. The list is set
    * in BackupInfo.
-   * @param conn the Connection
-   * @param backupInfo backup info
-   * @return The new HashMap of RS log timestamps after the log roll for this incremental backup.
+   * @return The new HashMap of RS log time stamps after the log roll for this incremental backup.
    * @throws IOException exception
    */
-  public HashMap<String, Long> getIncrBackupLogFileList(Connection conn, BackupInfo backupInfo)
+  public HashMap<String, Long> getIncrBackupLogFileMap()
       throws IOException {
     List<String> logList;
     HashMap<String, Long> newTimestamps;
@@ -105,40 +104,84 @@ public class IncrementalBackupManager extends BackupManager {
     List<WALItem> logFromSystemTable =
         getLogFilesFromBackupSystem(previousTimestampMins, newTimestamps, getBackupInfo()
             .getBackupRootDir());
-    addLogsFromBackupSystemToContext(logFromSystemTable);
-
     logList = excludeAlreadyBackedUpWALs(logList, logFromSystemTable);
     backupInfo.setIncrBackupFileList(logList);
 
     return newTimestamps;
   }
 
-  private List<String> excludeAlreadyBackedUpWALs(List<String> logList,
-      List<WALItem> logFromSystemTable) {
+  /**
+   * Get list of WAL files eligible for incremental backup
+   * @return list of WAL files
+   * @throws IOException
+   */
+  public List<String> getIncrBackupLogFileList()
+      throws IOException {
+    List<String> logList;
+    HashMap<String, Long> newTimestamps;
+    HashMap<String, Long> previousTimestampMins;
+
+    String savedStartCode = readBackupStartCode();
+
+    // key: tableName
+    // value: <RegionServer,PreviousTimeStamp>
+    HashMap<TableName, HashMap<String, Long>> previousTimestampMap = readLogTimestampMap();
+
+    previousTimestampMins = BackupUtils.getRSLogTimestampMins(previousTimestampMap);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("StartCode " + savedStartCode + "for backupID " + backupInfo.getBackupId());
+    }
+    // get all new log files from .logs and .oldlogs after last TS and before new timestamp
+    if (savedStartCode == null || previousTimestampMins == null
+        || previousTimestampMins.isEmpty()) {
+      throw new IOException(
+          "Cannot read any previous back up timestamps from backup system table. "
+              + "In order to create an incremental backup, at least one full backup is needed.");
+    }
+
+    newTimestamps = readRegionServerLastLogRollResult();
+
+    logList = getLogFilesForNewBackup(previousTimestampMins, newTimestamps, conf, savedStartCode);
+    List<WALItem> logFromSystemTable =
+        getLogFilesFromBackupSystem(previousTimestampMins, newTimestamps, getBackupInfo()
+            .getBackupRootDir());
+
+    logList = excludeAlreadyBackedUpWALs(logList, logFromSystemTable);
+    backupInfo.setIncrBackupFileList(logList);
 
-    List<String> backupedWALList = toWALList(logFromSystemTable);
-    logList.removeAll(backupedWALList);
     return logList;
   }
 
-  private List<String> toWALList(List<WALItem> logFromSystemTable) {
 
-    List<String> list = new ArrayList<String>(logFromSystemTable.size());
-    for (WALItem item : logFromSystemTable) {
-      list.add(item.getWalFile());
+  private List<String> excludeAlreadyBackedUpWALs(List<String> logList,
+      List<WALItem> logFromSystemTable) {
+
+    Set<String> walFileNameSet = convertToSet(logFromSystemTable);
+
+    List<String> list = new ArrayList<String>();
+    for (int i=0; i < logList.size(); i++) {
+      Path p = new Path(logList.get(i));
+      String name  = p.getName();
+      if (walFileNameSet.contains(name)) continue;
+      list.add(logList.get(i));
     }
     return list;
   }
 
-  private void addLogsFromBackupSystemToContext(List<WALItem> logFromSystemTable) {
-    List<String> walFiles = new ArrayList<String>();
-    for (WALItem item : logFromSystemTable) {
-      Path p = new Path(item.getWalFile());
-      String walFileName = p.getName();
-      String backupId = item.getBackupId();
-      String relWALPath = backupId + Path.SEPARATOR + walFileName;
-      walFiles.add(relWALPath);
+  /**
+   * Create Set of WAL file names (not full path names)
+   * @param logFromSystemTable
+   * @return set of WAL file names
+   */
+  private Set<String> convertToSet(List<WALItem> logFromSystemTable) {
+
+    Set<String> set = new HashSet<String>();
+    for (int i=0; i < logFromSystemTable.size(); i++) {
+      WALItem item = logFromSystemTable.get(i);
+      set.add(item.walFile);
     }
+    return set;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/910b6808/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
index 8f6f264..3003c93 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
 
 import org.apache.commons.lang.StringUtils;
@@ -34,7 +35,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.backup.BackupCopyJob;
 import org.apache.hadoop.hbase.backup.BackupInfo;
@@ -45,11 +45,15 @@ import org.apache.hadoop.hbase.backup.BackupRestoreFactory;
 import org.apache.hadoop.hbase.backup.BackupType;
 import org.apache.hadoop.hbase.backup.util.BackupUtils;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.mapreduce.WALPlayer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.HFileArchiveUtil;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.util.Tool;
 
 /**
  * Incremental backup implementation.
@@ -69,7 +73,8 @@ public class IncrementalTableBackupClient extends TableBackupClient {
     FileSystem fs = FileSystem.get(conf);
     List<String> list = new ArrayList<String>();
     for (String file : incrBackupFileList) {
-      if (fs.exists(new Path(file))) {
+      Path p = new Path(file);
+      if (fs.exists(p) || isActiveWalPath(p)) {
         list.add(file);
       } else {
         LOG.warn("Can't find file: " + file);
@@ -78,90 +83,13 @@ public class IncrementalTableBackupClient extends TableBackupClient {
     return list;
   }
 
-  private List<String> getMissingFiles(List<String> incrBackupFileList) throws IOException {
-    FileSystem fs = FileSystem.get(conf);
-    List<String> list = new ArrayList<String>();
-    for (String file : incrBackupFileList) {
-      if (!fs.exists(new Path(file))) {
-        list.add(file);
-      }
-    }
-    return list;
-
-  }
-
   /**
-   * Do incremental copy.
-   * @param backupInfo backup info
+   * Check if a given path is belongs to active WAL directory
+   * @param p path
+   * @return true, if yes
    */
-  private void incrementalCopy(BackupInfo backupInfo) throws Exception {
-
-    LOG.info("Incremental copy is starting.");
-    // set overall backup phase: incremental_copy
-    backupInfo.setPhase(BackupPhase.INCREMENTAL_COPY);
-    // get incremental backup file list and prepare parms for DistCp
-    List<String> incrBackupFileList = backupInfo.getIncrBackupFileList();
-    // filter missing files out (they have been copied by previous backups)
-    incrBackupFileList = filterMissingFiles(incrBackupFileList);
-    String[] strArr = incrBackupFileList.toArray(new String[incrBackupFileList.size() + 1]);
-    strArr[strArr.length - 1] = backupInfo.getHLogTargetDir();
-
-    BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf);
-    int counter = 0;
-    int MAX_ITERAIONS = 2;
-    while (counter++ < MAX_ITERAIONS) {
-      // We run DistCp maximum 2 times
-      // If it fails on a second time, we throw Exception
-      int res =
-          copyService.copy(backupInfo, backupManager, conf, BackupType.INCREMENTAL, strArr);
-
-      if (res != 0) {
-        LOG.error("Copy incremental log files failed with return code: " + res + ".");
-        throw new IOException("Failed of Hadoop Distributed Copy from "
-            + StringUtils.join(incrBackupFileList, ",") + " to "
-            + backupInfo.getHLogTargetDir());
-      }
-      List<String> missingFiles = getMissingFiles(incrBackupFileList);
-
-      if (missingFiles.isEmpty()) {
-        break;
-      } else {
-        // Repeat DistCp, some files have been moved from WALs to oldWALs during previous run
-        // update backupInfo and strAttr
-        if (counter == MAX_ITERAIONS) {
-          String msg =
-              "DistCp could not finish the following files: " + StringUtils.join(missingFiles, ",");
-          LOG.error(msg);
-          throw new IOException(msg);
-        }
-        List<String> converted = convertFilesFromWALtoOldWAL(missingFiles);
-        incrBackupFileList.removeAll(missingFiles);
-        incrBackupFileList.addAll(converted);
-        backupInfo.setIncrBackupFileList(incrBackupFileList);
-
-        // Run DistCp only for missing files (which have been moved from WALs to oldWALs
-        // during previous run)
-        strArr = converted.toArray(new String[converted.size() + 1]);
-        strArr[strArr.length - 1] = backupInfo.getHLogTargetDir();
-      }
-    }
-
-    LOG.info("Incremental copy from " + StringUtils.join(incrBackupFileList, ",") + " to "
-        + backupInfo.getHLogTargetDir() + " finished.");
-  }
-
-  private List<String> convertFilesFromWALtoOldWAL(List<String> missingFiles) throws IOException {
-    List<String> list = new ArrayList<String>();
-    for (String path : missingFiles) {
-      if (path.indexOf(Path.SEPARATOR + HConstants.HREGION_LOGDIR_NAME) < 0) {
-        LOG.error("Copy incremental log files failed, file is missing : " + path);
-        throw new IOException("Failed of Hadoop Distributed Copy to "
-            + backupInfo.getHLogTargetDir() + ", file is missing " + path);
-      }
-      list.add(path.replace(Path.SEPARATOR + HConstants.HREGION_LOGDIR_NAME, Path.SEPARATOR
-          + HConstants.HREGION_OLDLOGDIR_NAME));
-    }
-    return list;
+  private boolean isActiveWalPath(Path p) {
+    return !AbstractFSWALProvider.isArchivedLogFile(p);
   }
 
   static int getIndex(TableName tbl, List<TableName> sTableList) {
@@ -286,7 +214,7 @@ public class IncrementalTableBackupClient extends TableBackupClient {
         + backupManager.getIncrementalBackupTableSet());
     try {
       newTimestamps =
-          ((IncrementalBackupManager) backupManager).getIncrBackupLogFileList(conn, backupInfo);
+          ((IncrementalBackupManager) backupManager).getIncrBackupLogFileMap();
     } catch (Exception e) {
       // fail the overall backup and return
       failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ",
@@ -297,13 +225,16 @@ public class IncrementalTableBackupClient extends TableBackupClient {
     try {
       // copy out the table and region info files for each table
       BackupUtils.copyTableRegionInfo(conn, backupInfo, conf);
-      incrementalCopy(backupInfo);
+      // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT
+      convertWALsToHFiles(backupInfo);
+      incrementalCopyHFiles(backupInfo);
       // Save list of WAL files copied
       backupManager.recordWALFiles(backupInfo.getIncrBackupFileList());
     } catch (Exception e) {
       String msg = "Unexpected exception in incremental-backup: incremental copy " + backupId;
       // fail the overall backup and return
       failBackup(conn, backupInfo, backupManager, e, msg, BackupType.INCREMENTAL, conf);
+      return;
     }
     // case INCR_BACKUP_COMPLETE:
     // set overall backup status: complete. Here we make sure to complete the backup.
@@ -323,8 +254,7 @@ public class IncrementalTableBackupClient extends TableBackupClient {
           backupManager.readLogTimestampMap();
 
       Long newStartCode =
-          BackupUtils.getMinValue(BackupUtils
-              .getRSLogTimestampMins(newTableSetTimestampMap));
+          BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap));
       backupManager.writeBackupStartCode(newStartCode);
 
       handleBulkLoad(backupInfo.getTableNames());
@@ -337,4 +267,109 @@ public class IncrementalTableBackupClient extends TableBackupClient {
     }
   }
 
+  private void incrementalCopyHFiles(BackupInfo backupInfo) throws Exception {
+
+    try {
+      LOG.debug("Incremental copy HFiles is starting.");
+      // set overall backup phase: incremental_copy
+      backupInfo.setPhase(BackupPhase.INCREMENTAL_COPY);
+      // get incremental backup file list and prepare parms for DistCp
+      List<String> incrBackupFileList = new ArrayList<String>();
+      // Add Bulk output
+      incrBackupFileList.add(getBulkOutputDir().toString());
+      String[] strArr = incrBackupFileList.toArray(new String[incrBackupFileList.size() + 1]);
+      strArr[strArr.length - 1] = backupInfo.getBackupRootDir();
+      BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf);
+      int res = copyService.copy(backupInfo, backupManager, conf, BackupType.INCREMENTAL, strArr);
+      if (res != 0) {
+        LOG.error("Copy incremental HFile files failed with return code: " + res + ".");
+        throw new IOException("Failed copy from " + StringUtils.join(incrBackupFileList, ',')
+            + " to " + backupInfo.getHLogTargetDir());
+      }
+      LOG.debug("Incremental copy HFiles from " + StringUtils.join(incrBackupFileList, ',')
+          + " to " + backupInfo.getBackupRootDir() + " finished.");
+    } finally {
+      deleteBulkLoadDirectory();
+    }
+  }
+
+  private void deleteBulkLoadDirectory() throws IOException {
+    // delete original bulk load directory on method exit
+    Path path = getBulkOutputDir();
+    FileSystem fs = FileSystem.get(conf);
+    boolean result = fs.delete(path, true);
+    if (!result) {
+      LOG.warn("Could not delete " + path);
+    }
+
+  }
+
+  private void convertWALsToHFiles(BackupInfo backupInfo) throws IOException {
+    // get incremental backup file list and prepare parameters for DistCp
+    List<String> incrBackupFileList = backupInfo.getIncrBackupFileList();
+    // Get list of tables in incremental backup set
+    Set<TableName> tableSet = backupManager.getIncrementalBackupTableSet();
+    // filter missing files out (they have been copied by previous backups)
+    incrBackupFileList = filterMissingFiles(incrBackupFileList);
+    for (TableName table : tableSet) {
+      // Check if table exists
+      if (tableExists(table, conn)) {
+        walToHFiles(incrBackupFileList, table);
+      } else {
+        LOG.warn("Table " + table + " does not exists. Skipping in WAL converter");
+      }
+    }
+  }
+
+
+  private boolean tableExists(TableName table, Connection conn) throws IOException {
+    try (Admin admin = conn.getAdmin();) {
+      return admin.tableExists(table);
+    }
+  }
+
+  private void walToHFiles(List<String> dirPaths, TableName tableName) throws IOException {
+
+    Tool player = new WALPlayer();
+
+    // Player reads all files in arbitrary directory structure and creates
+    // a Map task for each file. We use ';' as separator
+    // because WAL file names contains ','
+    String dirs = StringUtils.join(dirPaths, ';');
+
+    Path bulkOutputPath = getBulkOutputDirForTable(tableName);
+    conf.set(WALPlayer.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
+    conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";");
+    String[] playerArgs = { dirs, tableName.getNameAsString() };
+
+    try {
+      player.setConf(conf);
+      int result = player.run(playerArgs);
+      if(result != 0) {
+        throw new IOException("WAL Player failed");
+      }
+      conf.unset(WALPlayer.INPUT_FILES_SEPARATOR_KEY);
+    } catch (IOException e) {
+      throw e;
+    } catch (Exception ee) {
+      throw new IOException("Can not convert from directory " + dirs
+          + " (check Hadoop, HBase and WALPlayer M/R job logs) ", ee);
+    }
+  }
+
+  private Path getBulkOutputDirForTable(TableName table) {
+    Path tablePath = getBulkOutputDir();
+    tablePath = new Path(tablePath, table.getNamespaceAsString());
+    tablePath = new Path(tablePath, table.getQualifierAsString());
+    return new Path(tablePath, "data");
+  }
+
+  private Path getBulkOutputDir() {
+    String backupId = backupInfo.getBackupId();
+    Path path = new Path(backupInfo.getBackupRootDir());
+    path = new Path(path, ".tmp");
+    path = new Path(path, backupId);
+    return path;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/910b6808/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
index 2e4ecce..381e9b1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
@@ -169,8 +169,9 @@ public class RestoreTablesClient {
     // full backup path comes first
     for (int i = 1; i < images.length; i++) {
       BackupImage im = images[i];
-      String logBackupDir = HBackupFileSystem.getLogBackupDir(im.getRootDir(), im.getBackupId());
-      dirList.add(new Path(logBackupDir));
+      String fileBackupDir = HBackupFileSystem.getTableBackupDir(im.getRootDir(),
+                  im.getBackupId(), sTable)+ Path.SEPARATOR+"data";
+      dirList.add(new Path(fileBackupDir));
     }
 
     String dirs = StringUtils.join(dirList, ",");

http://git-wip-us.apache.org/repos/asf/hbase/blob/910b6808/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java
index 42a8076..125b5da 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java
@@ -351,10 +351,6 @@ public abstract class TableBackupClient {
     // add and store the manifest for the backup
     addManifest(backupInfo, backupManager, type, conf);
 
-    // after major steps done and manifest persisted, do convert if needed for incremental backup
-    /* in-fly convert code here, provided by future jira */
-    LOG.debug("in-fly convert code here, provided by future jira");
-
     // compose the backup complete data
     String backupCompleteData =
         obtainBackupMetaDataStr(backupInfo) + ",startts=" + backupInfo.getStartTs()

http://git-wip-us.apache.org/repos/asf/hbase/blob/910b6808/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java
index 5641720..604e502 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java
@@ -107,11 +107,11 @@ public class HFileSplitterJob extends Configured implements Tool {
     String inputDirs = args[0];
     String tabName = args[1];
     conf.setStrings(TABLES_KEY, tabName);
+    conf.set(FileInputFormat.INPUT_DIR, inputDirs);
     Job job =
         Job.getInstance(conf,
           conf.get(JOB_NAME_CONF_KEY, NAME + "_" + EnvironmentEdgeManager.currentTime()));
     job.setJarByClass(HFileSplitterJob.class);
-    FileInputFormat.addInputPaths(job, inputDirs);
     job.setInputFormatClass(HFileInputFormat.class);
     job.setMapOutputKeyClass(ImmutableBytesWritable.class);
     String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);

http://git-wip-us.apache.org/repos/asf/hbase/blob/910b6808/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
index 9bafe12..4161ca9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
@@ -62,13 +62,8 @@ public class MapReduceRestoreJob implements RestoreJob {
 
     String bulkOutputConfKey;
 
-    if (fullBackupRestore) {
-      player = new HFileSplitterJob();
-      bulkOutputConfKey = HFileSplitterJob.BULK_OUTPUT_CONF_KEY;
-    } else {
-      player = new WALPlayer();
-      bulkOutputConfKey = WALPlayer.BULK_OUTPUT_CONF_KEY;
-    }
+    player = new HFileSplitterJob();
+    bulkOutputConfKey = HFileSplitterJob.BULK_OUTPUT_CONF_KEY;
     // Player reads all files in arbitrary directory structure and creates
     // a Map task for each file
     String dirs = StringUtils.join(dirPaths, ",");
@@ -88,7 +83,10 @@ public class MapReduceRestoreJob implements RestoreJob {
       Path bulkOutputPath = getBulkOutputDir(getFileNameCompatibleString(newTableNames[i]));
       Configuration conf = getConf();
       conf.set(bulkOutputConfKey, bulkOutputPath.toString());
-      String[] playerArgs = { dirs, tableNames[i].getNameAsString() };
+      String[] playerArgs =
+        { dirs,
+          fullBackupRestore? newTableNames[i].getNameAsString():tableNames[i].getNameAsString()
+        };
 
       int result = 0;
       int loaderResult = 0;

http://git-wip-us.apache.org/repos/asf/hbase/blob/910b6808/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java
index 79adcab..d34701f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java
@@ -46,8 +46,6 @@ import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.io.HFileLink;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
-import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
-import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.SnapshotDescription;
 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
@@ -63,19 +61,13 @@ import org.apache.hadoop.hbase.util.FSTableDescriptors;
 public class RestoreTool {
 
   public static final Log LOG = LogFactory.getLog(BackupUtils.class);
-
-  private final String[] ignoreDirs = { HConstants.RECOVERED_EDITS_DIR };
-
   private final static long TABLE_AVAILABILITY_WAIT_TIME = 180000;
 
+  private final String[] ignoreDirs = { HConstants.RECOVERED_EDITS_DIR };
   protected Configuration conf = null;
-
   protected Path backupRootPath;
-
   protected String backupId;
-
   protected FileSystem fs;
-  private final Path restoreTmpPath;
 
   // store table name and snapshot dir mapping
   private final HashMap<TableName, Path> snapshotMap = new HashMap<>();
@@ -86,9 +78,6 @@ public class RestoreTool {
     this.backupRootPath = backupRootPath;
     this.backupId = backupId;
     this.fs = backupRootPath.getFileSystem(conf);
-    this.restoreTmpPath =
-        new Path(conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
-          HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY), "restore");
   }
 
   /**
@@ -218,7 +207,7 @@ public class RestoreTool {
   public void fullRestoreTable(Connection conn, Path tableBackupPath, TableName tableName,
       TableName newTableName, boolean truncateIfExists, String lastIncrBackupId)
           throws IOException {
-    restoreTableAndCreate(conn, tableName, newTableName, tableBackupPath, truncateIfExists,
+    createAndRestoreTable(conn, tableName, newTableName, tableBackupPath, truncateIfExists,
       lastIncrBackupId);
   }
 
@@ -281,48 +270,6 @@ public class RestoreTool {
     return tableDescriptor;
   }
 
-  /**
-   * Duplicate the backup image if it's on local cluster
-   * @see HStore#bulkLoadHFile(org.apache.hadoop.hbase.regionserver.StoreFile)
-   * @see HRegionFileSystem#bulkLoadStoreFile(String familyName, Path srcPath, long seqNum)
-   * @param tableArchivePath archive path
-   * @return the new tableArchivePath
-   * @throws IOException exception
-   */
-  Path checkLocalAndBackup(Path tableArchivePath) throws IOException {
-    // Move the file if it's on local cluster
-    boolean isCopyNeeded = false;
-
-    FileSystem srcFs = tableArchivePath.getFileSystem(conf);
-    FileSystem desFs = FileSystem.get(conf);
-    if (tableArchivePath.getName().startsWith("/")) {
-      isCopyNeeded = true;
-    } else {
-      // This should match what is done in @see HRegionFileSystem#bulkLoadStoreFile(String, Path,
-      // long)
-      if (srcFs.getUri().equals(desFs.getUri())) {
-        LOG.debug("cluster hold the backup image: " + srcFs.getUri() + "; local cluster node: "
-            + desFs.getUri());
-        isCopyNeeded = true;
-      }
-    }
-    if (isCopyNeeded) {
-      LOG.debug("File " + tableArchivePath + " on local cluster, back it up before restore");
-      if (desFs.exists(restoreTmpPath)) {
-        try {
-          desFs.delete(restoreTmpPath, true);
-        } catch (IOException e) {
-          LOG.debug("Failed to delete path: " + restoreTmpPath
-              + ", need to check whether restore target DFS cluster is healthy");
-        }
-      }
-      FileUtil.copy(srcFs, tableArchivePath, desFs, restoreTmpPath, false, conf);
-      LOG.debug("Copied to temporary path on local cluster: " + restoreTmpPath);
-      tableArchivePath = restoreTmpPath;
-    }
-    return tableArchivePath;
-  }
-
   private HTableDescriptor getTableDescriptor(FileSystem fileSys, TableName tableName,
       String lastIncrBackupId) throws IOException {
     if (lastIncrBackupId != null) {
@@ -334,7 +281,7 @@ public class RestoreTool {
     return null;
   }
 
-  private void restoreTableAndCreate(Connection conn, TableName tableName, TableName newTableName,
+  private void createAndRestoreTable(Connection conn, TableName tableName, TableName newTableName,
       Path tableBackupPath, boolean truncateIfExists, String lastIncrBackupId) throws IOException {
     if (newTableName == null) {
       newTableName = tableName;
@@ -403,33 +350,13 @@ public class RestoreTool {
       // the regions in fine grain
       checkAndCreateTable(conn, tableBackupPath, tableName, newTableName, regionPathList,
         tableDescriptor, truncateIfExists);
-      if (tableArchivePath != null) {
-        // start real restore through bulkload
-        // if the backup target is on local cluster, special action needed
-        Path tempTableArchivePath = checkLocalAndBackup(tableArchivePath);
-        if (tempTableArchivePath.equals(tableArchivePath)) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("TableArchivePath for bulkload using existPath: " + tableArchivePath);
-          }
-        } else {
-          regionPathList = getRegionList(tempTableArchivePath); // point to the tempDir
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("TableArchivePath for bulkload using tempPath: " + tempTableArchivePath);
-          }
-        }
+      RestoreJob restoreService = BackupRestoreFactory.getRestoreJob(conf);
+      Path[] paths = new Path[regionPathList.size()];
+      regionPathList.toArray(paths);
+      restoreService.run(paths, new TableName[]{tableName}, new TableName[] {newTableName}, true);
 
-        LoadIncrementalHFiles loader = createLoader(tempTableArchivePath, false);
-        for (Path regionPath : regionPathList) {
-          String regionName = regionPath.toString();
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Restoring HFiles from directory " + regionName);
-          }
-          String[] args = { regionName, newTableName.getNameAsString() };
-          loader.run(args);
-        }
-      }
-      // we do not recovered edits
     } catch (Exception e) {
+      LOG.error(e);
       throw new IllegalStateException("Cannot restore hbase table", e);
     }
   }
@@ -453,28 +380,6 @@ public class RestoreTool {
   }
 
   /**
-   * Create a {@link LoadIncrementalHFiles} instance to be used to restore the HFiles of a full
-   * backup.
-   * @return the {@link LoadIncrementalHFiles} instance
-   * @throws IOException exception
-   */
-  private LoadIncrementalHFiles createLoader(Path tableArchivePath, boolean multipleTables)
-      throws IOException {
-
-    // By default, it is 32 and loader will fail if # of files in any region exceed this
-    // limit. Bad for snapshot restore.
-    this.conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE);
-    this.conf.set(LoadIncrementalHFiles.IGNORE_UNMATCHED_CF_CONF_KEY, "yes");
-    LoadIncrementalHFiles loader = null;
-    try {
-      loader = new LoadIncrementalHFiles(this.conf);
-    } catch (Exception e1) {
-      throw new IOException(e1);
-    }
-    return loader;
-  }
-
-  /**
    * Calculate region boundaries and add all the column families to the table descriptor
    * @param regionDirList region dir list
    * @return a set of keys to store the boundaries
@@ -591,17 +496,18 @@ public class RestoreTool {
           // create table using table descriptor and region boundaries
           admin.createTable(htd, keys);
         }
-        long startTime = EnvironmentEdgeManager.currentTime();
-        while (!admin.isTableAvailable(targetTableName, keys)) {
-          try {
-            Thread.sleep(100);
-          } catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();
-          }
-          if (EnvironmentEdgeManager.currentTime() - startTime > TABLE_AVAILABILITY_WAIT_TIME) {
-            throw new IOException("Time out " + TABLE_AVAILABILITY_WAIT_TIME + "ms expired, table "
-                + targetTableName + " is still not available");
-          }
+
+      }
+      long startTime = EnvironmentEdgeManager.currentTime();
+      while (!admin.isTableAvailable(targetTableName)) {
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException ie) {
+          Thread.currentThread().interrupt();
+        }
+        if (EnvironmentEdgeManager.currentTime() - startTime > TABLE_AVAILABILITY_WAIT_TIME) {
+          throw new IOException("Time out " + TABLE_AVAILABILITY_WAIT_TIME + "ms expired, table "
+              + targetTableName + " is still not available");
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/910b6808/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
index 0ca78b4..8b4e967 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.mapreduce;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.EOFException;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -35,10 +36,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WAL.Reader;
-import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputFormat;
@@ -46,6 +47,7 @@ import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -142,56 +144,89 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
     Entry currentEntry = new Entry();
     private long startTime;
     private long endTime;
+    private Configuration conf;
+    private Path logFile;
+    private long currentPos;
 
     @Override
     public void initialize(InputSplit split, TaskAttemptContext context)
         throws IOException, InterruptedException {
       WALSplit hsplit = (WALSplit)split;
-      Path logFile = new Path(hsplit.getLogFileName());
-      Configuration conf = context.getConfiguration();
+      logFile = new Path(hsplit.getLogFileName());
+      conf = context.getConfiguration();
       LOG.info("Opening reader for "+split);
-      try {
-        this.reader = WALFactory.createReader(logFile.getFileSystem(conf), logFile, conf);
-      } catch (EOFException x) {
-        LOG.info("Ignoring corrupted WAL file: " + logFile
-            + " (This is normal when a RegionServer crashed.)");
-        this.reader = null;
-      }
+      openReader(logFile);
       this.startTime = hsplit.getStartTime();
       this.endTime = hsplit.getEndTime();
     }
 
+    private void openReader(Path path) throws IOException
+    {
+      closeReader();
+      reader = AbstractFSWALProvider.openReader(path, conf);
+      seek();
+      setCurrentPath(path);
+    }
+
+    private void setCurrentPath(Path path) {
+      this.logFile = path;
+    }
+
+    private void closeReader() throws IOException {
+      if (reader != null) {
+        reader.close();
+        reader = null;
+      }
+    }
+
+    private void seek() throws IOException {
+      if (currentPos != 0) {
+        reader.seek(currentPos);
+      }
+    }
+
     @Override
     public boolean nextKeyValue() throws IOException, InterruptedException {
       if (reader == null) return false;
-
+      this.currentPos = reader.getPosition();
       Entry temp;
       long i = -1;
-      do {
-        // skip older entries
-        try {
-          temp = reader.next(currentEntry);
-          i++;
-        } catch (EOFException x) {
-          LOG.warn("Corrupted entry detected. Ignoring the rest of the file."
-              + " (This is normal when a RegionServer crashed.)");
+      try {
+        do {
+          // skip older entries
+          try {
+            temp = reader.next(currentEntry);
+            i++;
+          } catch (EOFException x) {
+            LOG.warn("Corrupted entry detected. Ignoring the rest of the file."
+                + " (This is normal when a RegionServer crashed.)");
+            return false;
+          }
+        } while (temp != null && temp.getKey().getWriteTime() < startTime);
+
+        if (temp == null) {
+          if (i > 0) LOG.info("Skipped " + i + " entries.");
+          LOG.info("Reached end of file.");
           return false;
+        } else if (i > 0) {
+          LOG.info("Skipped " + i + " entries, until ts: " + temp.getKey().getWriteTime() + ".");
+        }
+        boolean res = temp.getKey().getWriteTime() <= endTime;
+        if (!res) {
+          LOG.info("Reached ts: " + temp.getKey().getWriteTime()
+              + " ignoring the rest of the file.");
+        }
+        return res;
+      } catch (IOException e) {
+        Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(logFile, conf);
+        if (logFile != archivedLog) {
+          openReader(archivedLog);
+          // Try call again in recursion
+          return nextKeyValue();
+        } else {
+          throw e;
         }
       }
-      while(temp != null && temp.getKey().getWriteTime() < startTime);
-
-      if (temp == null) {
-        if (i > 0) LOG.info("Skipped " + i + " entries.");
-        LOG.info("Reached end of file.");
-        return false;
-      } else if (i > 0) {
-        LOG.info("Skipped " + i + " entries, until ts: " + temp.getKey().getWriteTime() + ".");
-      }
-      boolean res = temp.getKey().getWriteTime() <= endTime;
-      if (!res) {
-        LOG.info("Reached ts: " + temp.getKey().getWriteTime() + " ignoring the rest of the file.");
-      }
-      return res;
     }
 
     @Override
@@ -235,6 +270,7 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
   List<InputSplit> getSplits(final JobContext context, final String startKey, final String endKey)
       throws IOException, InterruptedException {
     Configuration conf = context.getConfiguration();
+    boolean ignoreMissing = conf.getBoolean(WALPlayer.IGNORE_MISSING_FILES, false);
     Path[] inputPaths = getInputPaths(conf);
     long startTime = conf.getLong(startKey, Long.MIN_VALUE);
     long endTime = conf.getLong(endKey, Long.MAX_VALUE);
@@ -242,8 +278,16 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
     List<FileStatus> allFiles = new ArrayList<FileStatus>();
     for(Path inputPath: inputPaths){
       FileSystem fs = inputPath.getFileSystem(conf);
-      List<FileStatus> files = getFiles(fs, inputPath, startTime, endTime);
-      allFiles.addAll(files);
+      try {
+        List<FileStatus> files = getFiles(fs, inputPath, startTime, endTime);
+        allFiles.addAll(files);
+      } catch (FileNotFoundException e) {
+        if (ignoreMissing) {
+          LOG.warn("File "+ inputPath +" is missing. Skipping it.");
+          continue;
+        }
+        throw e;
+      }
     }
     List<InputSplit> splits = new ArrayList<InputSplit>(allFiles.size());
     for (FileStatus file : allFiles) {
@@ -253,8 +297,9 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
   }
 
   private Path[] getInputPaths(Configuration conf) {
-    String inpDirs = conf.get("mapreduce.input.fileinputformat.inputdir");
-    return StringUtils.stringToPath(inpDirs.split(","));
+    String inpDirs = conf.get(FileInputFormat.INPUT_DIR);
+    return StringUtils.stringToPath(
+      inpDirs.split(conf.get(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ",")));
   }
 
   private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime)

http://git-wip-us.apache.org/repos/asf/hbase/blob/910b6808/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
index d16dcf5..d15ffcf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
@@ -58,7 +58,7 @@ import org.apache.hadoop.util.ToolRunner;
 /**
  * A tool to replay WAL files as a M/R job.
  * The WAL can be replayed for a set of tables or all tables,
- * and a timerange can be provided (in milliseconds).
+ * and a time range can be provided (in milliseconds).
  * The WAL is filtered to the passed set of tables and  the output
  * can optionally be mapped to another set of tables.
  *
@@ -73,6 +73,9 @@ public class WALPlayer extends Configured implements Tool {
   public final static String BULK_OUTPUT_CONF_KEY = "wal.bulk.output";
   public final static String TABLES_KEY = "wal.input.tables";
   public final static String TABLE_MAP_KEY = "wal.input.tablesmap";
+  public final static String INPUT_FILES_SEPARATOR_KEY = "wal.input.separator";
+  public final static String IGNORE_MISSING_FILES = "wal.input.ignore.missing.files";
+
 
   // This relies on Hadoop Configuration to handle warning about deprecated configs and
   // to set the correct non-deprecated configs when an old one shows up.
@@ -128,7 +131,9 @@ public class WALPlayer extends Configured implements Tool {
         throw new IOException("Exactly one table must be specified for bulk HFile case.");
       }
       table = Bytes.toBytes(tables[0]);
+
     }
+
   }
 
   /**
@@ -280,11 +285,10 @@ public class WALPlayer extends Configured implements Tool {
     }
     conf.setStrings(TABLES_KEY, tables);
     conf.setStrings(TABLE_MAP_KEY, tableMap);
+    conf.set(FileInputFormat.INPUT_DIR, inputDirs);
     Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + System.currentTimeMillis()));
     job.setJarByClass(WALPlayer.class);
 
-    FileInputFormat.addInputPaths(job, inputDirs);
-
     job.setInputFormatClass(WALInputFormat.class);
     job.setMapOutputKeyClass(ImmutableBytesWritable.class);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/910b6808/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
index bf14933..28b7fda 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.wal;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -36,6 +37,9 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -374,6 +378,103 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
   }
 
   /**
+   * Get the archived WAL file path
+   * @param path - active WAL file path
+   * @param conf - configuration
+   * @return archived path if exists, path - otherwise
+   * @throws IOException exception
+   */
+  public static Path getArchivedLogPath(Path path, Configuration conf) throws IOException {
+    Path rootDir = FSUtils.getRootDir(conf);
+    Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+    Path archivedLogLocation = new Path(oldLogDir, path.getName());
+    final FileSystem fs = FSUtils.getCurrentFileSystem(conf);
+
+    if (fs.exists(archivedLogLocation)) {
+      LOG.info("Log " + path + " was moved to " + archivedLogLocation);
+      return archivedLogLocation;
+    } else {
+      LOG.error("Couldn't locate log: " + path);
+      return path;
+    }
+  }
+
+  /**
+   * Opens WAL reader with retries and
+   * additional exception handling
+   * @param path path to WAL file
+   * @param conf configuration
+   * @return WAL Reader instance
+   * @throws IOException
+   */
+  public static org.apache.hadoop.hbase.wal.WAL.Reader
+    openReader(Path path, Configuration conf)
+        throws IOException
+
+  {
+    long retryInterval = 2000; // 2 sec
+    int maxAttempts = 30;
+    int attempt = 0;
+    Exception ee = null;
+    org.apache.hadoop.hbase.wal.WAL.Reader reader = null;
+    while (reader == null && attempt++ < maxAttempts) {
+      try {
+        // Detect if this is a new file, if so get a new reader else
+        // reset the current reader so that we see the new data
+        reader = WALFactory.createReader(path.getFileSystem(conf), path, conf);
+        return reader;
+      } catch (FileNotFoundException fnfe) {
+        // If the log was archived, continue reading from there
+        Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(path, conf);
+        if (path != archivedLog) {
+          return openReader(archivedLog, conf);
+        } else {
+          throw fnfe;
+        }
+      } catch (LeaseNotRecoveredException lnre) {
+        // HBASE-15019 the WAL was not closed due to some hiccup.
+        LOG.warn("Try to recover the WAL lease " + path, lnre);
+        recoverLease(conf, path);
+        reader = null;
+        ee = lnre;
+      } catch (NullPointerException npe) {
+        // Workaround for race condition in HDFS-4380
+        // which throws a NPE if we open a file before any data node has the most recent block
+        // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
+        LOG.warn("Got NPE opening reader, will retry.");
+        reader = null;
+        ee = npe;
+      }
+      if (reader == null) {
+        // sleep before next attempt
+        try {
+          Thread.sleep(retryInterval);
+        } catch (InterruptedException e) {
+        }
+      }
+    }
+    throw new IOException("Could not open reader", ee);
+  }
+
+  // For HBASE-15019
+  private static void recoverLease(final Configuration conf, final Path path) {
+    try {
+      final FileSystem dfs = FSUtils.getCurrentFileSystem(conf);
+      FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
+      fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
+        @Override
+        public boolean progress() {
+          LOG.debug("Still trying to recover WAL lease: " + path);
+          return true;
+        }
+      });
+    } catch (IOException e) {
+      LOG.warn("unable to recover lease for WAL: " + path, e);
+    }
+  }
+
+
+  /**
    * Get prefix of the log from its name, assuming WAL name in format of
    * log_prefix.filenumber.log_suffix
    * @param name Name of the WAL to parse