You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2017/04/17 20:54:48 UTC
[09/50] [abbrv] hbase git commit: HBASE-14141 HBase Backup/Restore
Phase 3: Filter WALs on backup to include only edits from backed up tables
(Vladimir Rodionov)
HBASE-14141 HBase Backup/Restore Phase 3: Filter WALs on backup to include only edits from backed up tables (Vladimir Rodionov)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/910b6808
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/910b6808
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/910b6808
Branch: refs/heads/hbase-12439
Commit: 910b68082c8f200f0ba6395a76b7ee1c8917e401
Parents: e916b79
Author: tedyu <yu...@gmail.com>
Authored: Tue Apr 4 18:20:11 2017 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Tue Apr 4 18:20:11 2017 -0700
----------------------------------------------------------------------
.../hadoop/hbase/backup/impl/BackupManager.java | 2 +-
.../backup/impl/IncrementalBackupManager.java | 89 ++++++--
.../impl/IncrementalTableBackupClient.java | 211 +++++++++++--------
.../hbase/backup/impl/RestoreTablesClient.java | 5 +-
.../hbase/backup/impl/TableBackupClient.java | 4 -
.../backup/mapreduce/HFileSplitterJob.java | 2 +-
.../backup/mapreduce/MapReduceRestoreJob.java | 14 +-
.../hadoop/hbase/backup/util/RestoreTool.java | 134 ++----------
.../hadoop/hbase/mapreduce/WALInputFormat.java | 119 +++++++----
.../hadoop/hbase/mapreduce/WALPlayer.java | 10 +-
.../hadoop/hbase/wal/AbstractFSWALProvider.java | 101 +++++++++
11 files changed, 410 insertions(+), 281 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/910b6808/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
index c09ce48..f09310f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
@@ -466,7 +466,7 @@ public class BackupManager implements Closeable {
/**
* Saves list of WAL files after incremental backup operation. These files will be stored until
- * TTL expiration and are used by Backup Log Cleaner plugin to determine which WAL files can be
+ * TTL expiration and are used by Backup Log Cleaner plug-in to determine which WAL files can be
* safely purged.
*/
public void recordWALFiles(List<String> files) throws IOException {
http://git-wip-us.apache.org/repos/asf/hbase/blob/910b6808/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java
index 0f1453e..6330899 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java
@@ -21,8 +21,10 @@ package org.apache.hadoop.hbase.backup.impl;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -33,7 +35,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.backup.BackupInfo;
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable.WALItem;
import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
@@ -59,12 +60,10 @@ public class IncrementalBackupManager extends BackupManager {
/**
* Obtain the list of logs that need to be copied out for this incremental backup. The list is set
* in BackupInfo.
- * @param conn the Connection
- * @param backupInfo backup info
- * @return The new HashMap of RS log timestamps after the log roll for this incremental backup.
+ * @return The new HashMap of RS log time stamps after the log roll for this incremental backup.
* @throws IOException exception
*/
- public HashMap<String, Long> getIncrBackupLogFileList(Connection conn, BackupInfo backupInfo)
+ public HashMap<String, Long> getIncrBackupLogFileMap()
throws IOException {
List<String> logList;
HashMap<String, Long> newTimestamps;
@@ -105,40 +104,84 @@ public class IncrementalBackupManager extends BackupManager {
List<WALItem> logFromSystemTable =
getLogFilesFromBackupSystem(previousTimestampMins, newTimestamps, getBackupInfo()
.getBackupRootDir());
- addLogsFromBackupSystemToContext(logFromSystemTable);
-
logList = excludeAlreadyBackedUpWALs(logList, logFromSystemTable);
backupInfo.setIncrBackupFileList(logList);
return newTimestamps;
}
- private List<String> excludeAlreadyBackedUpWALs(List<String> logList,
- List<WALItem> logFromSystemTable) {
+ /**
+ * Get list of WAL files eligible for incremental backup
+ * @return list of WAL files
+ * @throws IOException
+ */
+ public List<String> getIncrBackupLogFileList()
+ throws IOException {
+ List<String> logList;
+ HashMap<String, Long> newTimestamps;
+ HashMap<String, Long> previousTimestampMins;
+
+ String savedStartCode = readBackupStartCode();
+
+ // key: tableName
+ // value: <RegionServer,PreviousTimeStamp>
+ HashMap<TableName, HashMap<String, Long>> previousTimestampMap = readLogTimestampMap();
+
+ previousTimestampMins = BackupUtils.getRSLogTimestampMins(previousTimestampMap);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("StartCode " + savedStartCode + "for backupID " + backupInfo.getBackupId());
+ }
+ // get all new log files from .logs and .oldlogs after last TS and before new timestamp
+ if (savedStartCode == null || previousTimestampMins == null
+ || previousTimestampMins.isEmpty()) {
+ throw new IOException(
+ "Cannot read any previous back up timestamps from backup system table. "
+ + "In order to create an incremental backup, at least one full backup is needed.");
+ }
+
+ newTimestamps = readRegionServerLastLogRollResult();
+
+ logList = getLogFilesForNewBackup(previousTimestampMins, newTimestamps, conf, savedStartCode);
+ List<WALItem> logFromSystemTable =
+ getLogFilesFromBackupSystem(previousTimestampMins, newTimestamps, getBackupInfo()
+ .getBackupRootDir());
+
+ logList = excludeAlreadyBackedUpWALs(logList, logFromSystemTable);
+ backupInfo.setIncrBackupFileList(logList);
- List<String> backupedWALList = toWALList(logFromSystemTable);
- logList.removeAll(backupedWALList);
return logList;
}
- private List<String> toWALList(List<WALItem> logFromSystemTable) {
- List<String> list = new ArrayList<String>(logFromSystemTable.size());
- for (WALItem item : logFromSystemTable) {
- list.add(item.getWalFile());
+ private List<String> excludeAlreadyBackedUpWALs(List<String> logList,
+ List<WALItem> logFromSystemTable) {
+
+ Set<String> walFileNameSet = convertToSet(logFromSystemTable);
+
+ List<String> list = new ArrayList<String>();
+ for (int i=0; i < logList.size(); i++) {
+ Path p = new Path(logList.get(i));
+ String name = p.getName();
+ if (walFileNameSet.contains(name)) continue;
+ list.add(logList.get(i));
}
return list;
}
- private void addLogsFromBackupSystemToContext(List<WALItem> logFromSystemTable) {
- List<String> walFiles = new ArrayList<String>();
- for (WALItem item : logFromSystemTable) {
- Path p = new Path(item.getWalFile());
- String walFileName = p.getName();
- String backupId = item.getBackupId();
- String relWALPath = backupId + Path.SEPARATOR + walFileName;
- walFiles.add(relWALPath);
+ /**
+ * Create Set of WAL file names (not full path names)
+ * @param logFromSystemTable
+ * @return set of WAL file names
+ */
+ private Set<String> convertToSet(List<WALItem> logFromSystemTable) {
+
+ Set<String> set = new HashSet<String>();
+ for (int i=0; i < logFromSystemTable.size(); i++) {
+ WALItem item = logFromSystemTable.get(i);
+ set.add(item.walFile);
}
+ return set;
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/910b6808/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
index 8f6f264..3003c93 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeMap;
import org.apache.commons.lang.StringUtils;
@@ -34,7 +35,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.BackupCopyJob;
import org.apache.hadoop.hbase.backup.BackupInfo;
@@ -45,11 +45,15 @@ import org.apache.hadoop.hbase.backup.BackupRestoreFactory;
import org.apache.hadoop.hbase.backup.BackupType;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.mapreduce.WALPlayer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.util.Tool;
/**
* Incremental backup implementation.
@@ -69,7 +73,8 @@ public class IncrementalTableBackupClient extends TableBackupClient {
FileSystem fs = FileSystem.get(conf);
List<String> list = new ArrayList<String>();
for (String file : incrBackupFileList) {
- if (fs.exists(new Path(file))) {
+ Path p = new Path(file);
+ if (fs.exists(p) || isActiveWalPath(p)) {
list.add(file);
} else {
LOG.warn("Can't find file: " + file);
@@ -78,90 +83,13 @@ public class IncrementalTableBackupClient extends TableBackupClient {
return list;
}
- private List<String> getMissingFiles(List<String> incrBackupFileList) throws IOException {
- FileSystem fs = FileSystem.get(conf);
- List<String> list = new ArrayList<String>();
- for (String file : incrBackupFileList) {
- if (!fs.exists(new Path(file))) {
- list.add(file);
- }
- }
- return list;
-
- }
-
/**
- * Do incremental copy.
- * @param backupInfo backup info
+ * Check if a given path is belongs to active WAL directory
+ * @param p path
+ * @return true, if yes
*/
- private void incrementalCopy(BackupInfo backupInfo) throws Exception {
-
- LOG.info("Incremental copy is starting.");
- // set overall backup phase: incremental_copy
- backupInfo.setPhase(BackupPhase.INCREMENTAL_COPY);
- // get incremental backup file list and prepare parms for DistCp
- List<String> incrBackupFileList = backupInfo.getIncrBackupFileList();
- // filter missing files out (they have been copied by previous backups)
- incrBackupFileList = filterMissingFiles(incrBackupFileList);
- String[] strArr = incrBackupFileList.toArray(new String[incrBackupFileList.size() + 1]);
- strArr[strArr.length - 1] = backupInfo.getHLogTargetDir();
-
- BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf);
- int counter = 0;
- int MAX_ITERAIONS = 2;
- while (counter++ < MAX_ITERAIONS) {
- // We run DistCp maximum 2 times
- // If it fails on a second time, we throw Exception
- int res =
- copyService.copy(backupInfo, backupManager, conf, BackupType.INCREMENTAL, strArr);
-
- if (res != 0) {
- LOG.error("Copy incremental log files failed with return code: " + res + ".");
- throw new IOException("Failed of Hadoop Distributed Copy from "
- + StringUtils.join(incrBackupFileList, ",") + " to "
- + backupInfo.getHLogTargetDir());
- }
- List<String> missingFiles = getMissingFiles(incrBackupFileList);
-
- if (missingFiles.isEmpty()) {
- break;
- } else {
- // Repeat DistCp, some files have been moved from WALs to oldWALs during previous run
- // update backupInfo and strAttr
- if (counter == MAX_ITERAIONS) {
- String msg =
- "DistCp could not finish the following files: " + StringUtils.join(missingFiles, ",");
- LOG.error(msg);
- throw new IOException(msg);
- }
- List<String> converted = convertFilesFromWALtoOldWAL(missingFiles);
- incrBackupFileList.removeAll(missingFiles);
- incrBackupFileList.addAll(converted);
- backupInfo.setIncrBackupFileList(incrBackupFileList);
-
- // Run DistCp only for missing files (which have been moved from WALs to oldWALs
- // during previous run)
- strArr = converted.toArray(new String[converted.size() + 1]);
- strArr[strArr.length - 1] = backupInfo.getHLogTargetDir();
- }
- }
-
- LOG.info("Incremental copy from " + StringUtils.join(incrBackupFileList, ",") + " to "
- + backupInfo.getHLogTargetDir() + " finished.");
- }
-
- private List<String> convertFilesFromWALtoOldWAL(List<String> missingFiles) throws IOException {
- List<String> list = new ArrayList<String>();
- for (String path : missingFiles) {
- if (path.indexOf(Path.SEPARATOR + HConstants.HREGION_LOGDIR_NAME) < 0) {
- LOG.error("Copy incremental log files failed, file is missing : " + path);
- throw new IOException("Failed of Hadoop Distributed Copy to "
- + backupInfo.getHLogTargetDir() + ", file is missing " + path);
- }
- list.add(path.replace(Path.SEPARATOR + HConstants.HREGION_LOGDIR_NAME, Path.SEPARATOR
- + HConstants.HREGION_OLDLOGDIR_NAME));
- }
- return list;
+ private boolean isActiveWalPath(Path p) {
+ return !AbstractFSWALProvider.isArchivedLogFile(p);
}
static int getIndex(TableName tbl, List<TableName> sTableList) {
@@ -286,7 +214,7 @@ public class IncrementalTableBackupClient extends TableBackupClient {
+ backupManager.getIncrementalBackupTableSet());
try {
newTimestamps =
- ((IncrementalBackupManager) backupManager).getIncrBackupLogFileList(conn, backupInfo);
+ ((IncrementalBackupManager) backupManager).getIncrBackupLogFileMap();
} catch (Exception e) {
// fail the overall backup and return
failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ",
@@ -297,13 +225,16 @@ public class IncrementalTableBackupClient extends TableBackupClient {
try {
// copy out the table and region info files for each table
BackupUtils.copyTableRegionInfo(conn, backupInfo, conf);
- incrementalCopy(backupInfo);
+ // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT
+ convertWALsToHFiles(backupInfo);
+ incrementalCopyHFiles(backupInfo);
// Save list of WAL files copied
backupManager.recordWALFiles(backupInfo.getIncrBackupFileList());
} catch (Exception e) {
String msg = "Unexpected exception in incremental-backup: incremental copy " + backupId;
// fail the overall backup and return
failBackup(conn, backupInfo, backupManager, e, msg, BackupType.INCREMENTAL, conf);
+ return;
}
// case INCR_BACKUP_COMPLETE:
// set overall backup status: complete. Here we make sure to complete the backup.
@@ -323,8 +254,7 @@ public class IncrementalTableBackupClient extends TableBackupClient {
backupManager.readLogTimestampMap();
Long newStartCode =
- BackupUtils.getMinValue(BackupUtils
- .getRSLogTimestampMins(newTableSetTimestampMap));
+ BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap));
backupManager.writeBackupStartCode(newStartCode);
handleBulkLoad(backupInfo.getTableNames());
@@ -337,4 +267,109 @@ public class IncrementalTableBackupClient extends TableBackupClient {
}
}
+ private void incrementalCopyHFiles(BackupInfo backupInfo) throws Exception {
+
+ try {
+ LOG.debug("Incremental copy HFiles is starting.");
+ // set overall backup phase: incremental_copy
+ backupInfo.setPhase(BackupPhase.INCREMENTAL_COPY);
+ // get incremental backup file list and prepare parms for DistCp
+ List<String> incrBackupFileList = new ArrayList<String>();
+ // Add Bulk output
+ incrBackupFileList.add(getBulkOutputDir().toString());
+ String[] strArr = incrBackupFileList.toArray(new String[incrBackupFileList.size() + 1]);
+ strArr[strArr.length - 1] = backupInfo.getBackupRootDir();
+ BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf);
+ int res = copyService.copy(backupInfo, backupManager, conf, BackupType.INCREMENTAL, strArr);
+ if (res != 0) {
+ LOG.error("Copy incremental HFile files failed with return code: " + res + ".");
+ throw new IOException("Failed copy from " + StringUtils.join(incrBackupFileList, ',')
+ + " to " + backupInfo.getHLogTargetDir());
+ }
+ LOG.debug("Incremental copy HFiles from " + StringUtils.join(incrBackupFileList, ',')
+ + " to " + backupInfo.getBackupRootDir() + " finished.");
+ } finally {
+ deleteBulkLoadDirectory();
+ }
+ }
+
+ private void deleteBulkLoadDirectory() throws IOException {
+ // delete original bulk load directory on method exit
+ Path path = getBulkOutputDir();
+ FileSystem fs = FileSystem.get(conf);
+ boolean result = fs.delete(path, true);
+ if (!result) {
+ LOG.warn("Could not delete " + path);
+ }
+
+ }
+
+ private void convertWALsToHFiles(BackupInfo backupInfo) throws IOException {
+ // get incremental backup file list and prepare parameters for DistCp
+ List<String> incrBackupFileList = backupInfo.getIncrBackupFileList();
+ // Get list of tables in incremental backup set
+ Set<TableName> tableSet = backupManager.getIncrementalBackupTableSet();
+ // filter missing files out (they have been copied by previous backups)
+ incrBackupFileList = filterMissingFiles(incrBackupFileList);
+ for (TableName table : tableSet) {
+ // Check if table exists
+ if (tableExists(table, conn)) {
+ walToHFiles(incrBackupFileList, table);
+ } else {
+ LOG.warn("Table " + table + " does not exists. Skipping in WAL converter");
+ }
+ }
+ }
+
+
+ private boolean tableExists(TableName table, Connection conn) throws IOException {
+ try (Admin admin = conn.getAdmin();) {
+ return admin.tableExists(table);
+ }
+ }
+
+ private void walToHFiles(List<String> dirPaths, TableName tableName) throws IOException {
+
+ Tool player = new WALPlayer();
+
+ // Player reads all files in arbitrary directory structure and creates
+ // a Map task for each file. We use ';' as separator
+ // because WAL file names contains ','
+ String dirs = StringUtils.join(dirPaths, ';');
+
+ Path bulkOutputPath = getBulkOutputDirForTable(tableName);
+ conf.set(WALPlayer.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
+ conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";");
+ String[] playerArgs = { dirs, tableName.getNameAsString() };
+
+ try {
+ player.setConf(conf);
+ int result = player.run(playerArgs);
+ if(result != 0) {
+ throw new IOException("WAL Player failed");
+ }
+ conf.unset(WALPlayer.INPUT_FILES_SEPARATOR_KEY);
+ } catch (IOException e) {
+ throw e;
+ } catch (Exception ee) {
+ throw new IOException("Can not convert from directory " + dirs
+ + " (check Hadoop, HBase and WALPlayer M/R job logs) ", ee);
+ }
+ }
+
+ private Path getBulkOutputDirForTable(TableName table) {
+ Path tablePath = getBulkOutputDir();
+ tablePath = new Path(tablePath, table.getNamespaceAsString());
+ tablePath = new Path(tablePath, table.getQualifierAsString());
+ return new Path(tablePath, "data");
+ }
+
+ private Path getBulkOutputDir() {
+ String backupId = backupInfo.getBackupId();
+ Path path = new Path(backupInfo.getBackupRootDir());
+ path = new Path(path, ".tmp");
+ path = new Path(path, backupId);
+ return path;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/910b6808/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
index 2e4ecce..381e9b1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
@@ -169,8 +169,9 @@ public class RestoreTablesClient {
// full backup path comes first
for (int i = 1; i < images.length; i++) {
BackupImage im = images[i];
- String logBackupDir = HBackupFileSystem.getLogBackupDir(im.getRootDir(), im.getBackupId());
- dirList.add(new Path(logBackupDir));
+ String fileBackupDir = HBackupFileSystem.getTableBackupDir(im.getRootDir(),
+ im.getBackupId(), sTable)+ Path.SEPARATOR+"data";
+ dirList.add(new Path(fileBackupDir));
}
String dirs = StringUtils.join(dirList, ",");
http://git-wip-us.apache.org/repos/asf/hbase/blob/910b6808/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java
index 42a8076..125b5da 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java
@@ -351,10 +351,6 @@ public abstract class TableBackupClient {
// add and store the manifest for the backup
addManifest(backupInfo, backupManager, type, conf);
- // after major steps done and manifest persisted, do convert if needed for incremental backup
- /* in-fly convert code here, provided by future jira */
- LOG.debug("in-fly convert code here, provided by future jira");
-
// compose the backup complete data
String backupCompleteData =
obtainBackupMetaDataStr(backupInfo) + ",startts=" + backupInfo.getStartTs()
http://git-wip-us.apache.org/repos/asf/hbase/blob/910b6808/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java
index 5641720..604e502 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java
@@ -107,11 +107,11 @@ public class HFileSplitterJob extends Configured implements Tool {
String inputDirs = args[0];
String tabName = args[1];
conf.setStrings(TABLES_KEY, tabName);
+ conf.set(FileInputFormat.INPUT_DIR, inputDirs);
Job job =
Job.getInstance(conf,
conf.get(JOB_NAME_CONF_KEY, NAME + "_" + EnvironmentEdgeManager.currentTime()));
job.setJarByClass(HFileSplitterJob.class);
- FileInputFormat.addInputPaths(job, inputDirs);
job.setInputFormatClass(HFileInputFormat.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
http://git-wip-us.apache.org/repos/asf/hbase/blob/910b6808/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
index 9bafe12..4161ca9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
@@ -62,13 +62,8 @@ public class MapReduceRestoreJob implements RestoreJob {
String bulkOutputConfKey;
- if (fullBackupRestore) {
- player = new HFileSplitterJob();
- bulkOutputConfKey = HFileSplitterJob.BULK_OUTPUT_CONF_KEY;
- } else {
- player = new WALPlayer();
- bulkOutputConfKey = WALPlayer.BULK_OUTPUT_CONF_KEY;
- }
+ player = new HFileSplitterJob();
+ bulkOutputConfKey = HFileSplitterJob.BULK_OUTPUT_CONF_KEY;
// Player reads all files in arbitrary directory structure and creates
// a Map task for each file
String dirs = StringUtils.join(dirPaths, ",");
@@ -88,7 +83,10 @@ public class MapReduceRestoreJob implements RestoreJob {
Path bulkOutputPath = getBulkOutputDir(getFileNameCompatibleString(newTableNames[i]));
Configuration conf = getConf();
conf.set(bulkOutputConfKey, bulkOutputPath.toString());
- String[] playerArgs = { dirs, tableNames[i].getNameAsString() };
+ String[] playerArgs =
+ { dirs,
+ fullBackupRestore? newTableNames[i].getNameAsString():tableNames[i].getNameAsString()
+ };
int result = 0;
int loaderResult = 0;
http://git-wip-us.apache.org/repos/asf/hbase/blob/910b6808/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java
index 79adcab..d34701f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java
@@ -46,8 +46,6 @@ import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
-import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
-import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
@@ -63,19 +61,13 @@ import org.apache.hadoop.hbase.util.FSTableDescriptors;
public class RestoreTool {
public static final Log LOG = LogFactory.getLog(BackupUtils.class);
-
- private final String[] ignoreDirs = { HConstants.RECOVERED_EDITS_DIR };
-
private final static long TABLE_AVAILABILITY_WAIT_TIME = 180000;
+ private final String[] ignoreDirs = { HConstants.RECOVERED_EDITS_DIR };
protected Configuration conf = null;
-
protected Path backupRootPath;
-
protected String backupId;
-
protected FileSystem fs;
- private final Path restoreTmpPath;
// store table name and snapshot dir mapping
private final HashMap<TableName, Path> snapshotMap = new HashMap<>();
@@ -86,9 +78,6 @@ public class RestoreTool {
this.backupRootPath = backupRootPath;
this.backupId = backupId;
this.fs = backupRootPath.getFileSystem(conf);
- this.restoreTmpPath =
- new Path(conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
- HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY), "restore");
}
/**
@@ -218,7 +207,7 @@ public class RestoreTool {
public void fullRestoreTable(Connection conn, Path tableBackupPath, TableName tableName,
TableName newTableName, boolean truncateIfExists, String lastIncrBackupId)
throws IOException {
- restoreTableAndCreate(conn, tableName, newTableName, tableBackupPath, truncateIfExists,
+ createAndRestoreTable(conn, tableName, newTableName, tableBackupPath, truncateIfExists,
lastIncrBackupId);
}
@@ -281,48 +270,6 @@ public class RestoreTool {
return tableDescriptor;
}
- /**
- * Duplicate the backup image if it's on local cluster
- * @see HStore#bulkLoadHFile(org.apache.hadoop.hbase.regionserver.StoreFile)
- * @see HRegionFileSystem#bulkLoadStoreFile(String familyName, Path srcPath, long seqNum)
- * @param tableArchivePath archive path
- * @return the new tableArchivePath
- * @throws IOException exception
- */
- Path checkLocalAndBackup(Path tableArchivePath) throws IOException {
- // Move the file if it's on local cluster
- boolean isCopyNeeded = false;
-
- FileSystem srcFs = tableArchivePath.getFileSystem(conf);
- FileSystem desFs = FileSystem.get(conf);
- if (tableArchivePath.getName().startsWith("/")) {
- isCopyNeeded = true;
- } else {
- // This should match what is done in @see HRegionFileSystem#bulkLoadStoreFile(String, Path,
- // long)
- if (srcFs.getUri().equals(desFs.getUri())) {
- LOG.debug("cluster hold the backup image: " + srcFs.getUri() + "; local cluster node: "
- + desFs.getUri());
- isCopyNeeded = true;
- }
- }
- if (isCopyNeeded) {
- LOG.debug("File " + tableArchivePath + " on local cluster, back it up before restore");
- if (desFs.exists(restoreTmpPath)) {
- try {
- desFs.delete(restoreTmpPath, true);
- } catch (IOException e) {
- LOG.debug("Failed to delete path: " + restoreTmpPath
- + ", need to check whether restore target DFS cluster is healthy");
- }
- }
- FileUtil.copy(srcFs, tableArchivePath, desFs, restoreTmpPath, false, conf);
- LOG.debug("Copied to temporary path on local cluster: " + restoreTmpPath);
- tableArchivePath = restoreTmpPath;
- }
- return tableArchivePath;
- }
-
private HTableDescriptor getTableDescriptor(FileSystem fileSys, TableName tableName,
String lastIncrBackupId) throws IOException {
if (lastIncrBackupId != null) {
@@ -334,7 +281,7 @@ public class RestoreTool {
return null;
}
- private void restoreTableAndCreate(Connection conn, TableName tableName, TableName newTableName,
+ private void createAndRestoreTable(Connection conn, TableName tableName, TableName newTableName,
Path tableBackupPath, boolean truncateIfExists, String lastIncrBackupId) throws IOException {
if (newTableName == null) {
newTableName = tableName;
@@ -403,33 +350,13 @@ public class RestoreTool {
// the regions in fine grain
checkAndCreateTable(conn, tableBackupPath, tableName, newTableName, regionPathList,
tableDescriptor, truncateIfExists);
- if (tableArchivePath != null) {
- // start real restore through bulkload
- // if the backup target is on local cluster, special action needed
- Path tempTableArchivePath = checkLocalAndBackup(tableArchivePath);
- if (tempTableArchivePath.equals(tableArchivePath)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("TableArchivePath for bulkload using existPath: " + tableArchivePath);
- }
- } else {
- regionPathList = getRegionList(tempTableArchivePath); // point to the tempDir
- if (LOG.isDebugEnabled()) {
- LOG.debug("TableArchivePath for bulkload using tempPath: " + tempTableArchivePath);
- }
- }
+ RestoreJob restoreService = BackupRestoreFactory.getRestoreJob(conf);
+ Path[] paths = new Path[regionPathList.size()];
+ regionPathList.toArray(paths);
+ restoreService.run(paths, new TableName[]{tableName}, new TableName[] {newTableName}, true);
- LoadIncrementalHFiles loader = createLoader(tempTableArchivePath, false);
- for (Path regionPath : regionPathList) {
- String regionName = regionPath.toString();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Restoring HFiles from directory " + regionName);
- }
- String[] args = { regionName, newTableName.getNameAsString() };
- loader.run(args);
- }
- }
- // we do not recovered edits
} catch (Exception e) {
+ LOG.error(e);
throw new IllegalStateException("Cannot restore hbase table", e);
}
}
@@ -453,28 +380,6 @@ public class RestoreTool {
}
/**
- * Create a {@link LoadIncrementalHFiles} instance to be used to restore the HFiles of a full
- * backup.
- * @return the {@link LoadIncrementalHFiles} instance
- * @throws IOException exception
- */
- private LoadIncrementalHFiles createLoader(Path tableArchivePath, boolean multipleTables)
- throws IOException {
-
- // By default, it is 32 and loader will fail if # of files in any region exceed this
- // limit. Bad for snapshot restore.
- this.conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE);
- this.conf.set(LoadIncrementalHFiles.IGNORE_UNMATCHED_CF_CONF_KEY, "yes");
- LoadIncrementalHFiles loader = null;
- try {
- loader = new LoadIncrementalHFiles(this.conf);
- } catch (Exception e1) {
- throw new IOException(e1);
- }
- return loader;
- }
-
- /**
* Calculate region boundaries and add all the column families to the table descriptor
* @param regionDirList region dir list
* @return a set of keys to store the boundaries
@@ -591,17 +496,18 @@ public class RestoreTool {
// create table using table descriptor and region boundaries
admin.createTable(htd, keys);
}
- long startTime = EnvironmentEdgeManager.currentTime();
- while (!admin.isTableAvailable(targetTableName, keys)) {
- try {
- Thread.sleep(100);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- }
- if (EnvironmentEdgeManager.currentTime() - startTime > TABLE_AVAILABILITY_WAIT_TIME) {
- throw new IOException("Time out " + TABLE_AVAILABILITY_WAIT_TIME + "ms expired, table "
- + targetTableName + " is still not available");
- }
+
+ }
+ long startTime = EnvironmentEdgeManager.currentTime();
+ while (!admin.isTableAvailable(targetTableName)) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ if (EnvironmentEdgeManager.currentTime() - startTime > TABLE_AVAILABILITY_WAIT_TIME) {
+ throw new IOException("Time out " + TABLE_AVAILABILITY_WAIT_TIME + "ms expired, table "
+ + targetTableName + " is still not available");
}
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/910b6808/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
index 0ca78b4..8b4e967 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.mapreduce;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.EOFException;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -35,10 +36,10 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WAL.Reader;
-import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
@@ -46,6 +47,7 @@ import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.StringUtils;
/**
@@ -142,56 +144,89 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
Entry currentEntry = new Entry();
private long startTime;
private long endTime;
+ private Configuration conf;
+ private Path logFile;
+ private long currentPos;
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
WALSplit hsplit = (WALSplit)split;
- Path logFile = new Path(hsplit.getLogFileName());
- Configuration conf = context.getConfiguration();
+ logFile = new Path(hsplit.getLogFileName());
+ conf = context.getConfiguration();
LOG.info("Opening reader for "+split);
- try {
- this.reader = WALFactory.createReader(logFile.getFileSystem(conf), logFile, conf);
- } catch (EOFException x) {
- LOG.info("Ignoring corrupted WAL file: " + logFile
- + " (This is normal when a RegionServer crashed.)");
- this.reader = null;
- }
+ openReader(logFile);
this.startTime = hsplit.getStartTime();
this.endTime = hsplit.getEndTime();
}
+ private void openReader(Path path) throws IOException
+ {
+ closeReader();
+ reader = AbstractFSWALProvider.openReader(path, conf);
+ seek();
+ setCurrentPath(path);
+ }
+
+ private void setCurrentPath(Path path) {
+ this.logFile = path;
+ }
+
+ private void closeReader() throws IOException {
+ if (reader != null) {
+ reader.close();
+ reader = null;
+ }
+ }
+
+ private void seek() throws IOException {
+ if (currentPos != 0) {
+ reader.seek(currentPos);
+ }
+ }
+
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (reader == null) return false;
-
+ this.currentPos = reader.getPosition();
Entry temp;
long i = -1;
- do {
- // skip older entries
- try {
- temp = reader.next(currentEntry);
- i++;
- } catch (EOFException x) {
- LOG.warn("Corrupted entry detected. Ignoring the rest of the file."
- + " (This is normal when a RegionServer crashed.)");
+ try {
+ do {
+ // skip older entries
+ try {
+ temp = reader.next(currentEntry);
+ i++;
+ } catch (EOFException x) {
+ LOG.warn("Corrupted entry detected. Ignoring the rest of the file."
+ + " (This is normal when a RegionServer crashed.)");
+ return false;
+ }
+ } while (temp != null && temp.getKey().getWriteTime() < startTime);
+
+ if (temp == null) {
+ if (i > 0) LOG.info("Skipped " + i + " entries.");
+ LOG.info("Reached end of file.");
return false;
+ } else if (i > 0) {
+ LOG.info("Skipped " + i + " entries, until ts: " + temp.getKey().getWriteTime() + ".");
+ }
+ boolean res = temp.getKey().getWriteTime() <= endTime;
+ if (!res) {
+ LOG.info("Reached ts: " + temp.getKey().getWriteTime()
+ + " ignoring the rest of the file.");
+ }
+ return res;
+ } catch (IOException e) {
+ Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(logFile, conf);
+ if (logFile != archivedLog) {
+ openReader(archivedLog);
+ // Try call again in recursion
+ return nextKeyValue();
+ } else {
+ throw e;
}
}
- while(temp != null && temp.getKey().getWriteTime() < startTime);
-
- if (temp == null) {
- if (i > 0) LOG.info("Skipped " + i + " entries.");
- LOG.info("Reached end of file.");
- return false;
- } else if (i > 0) {
- LOG.info("Skipped " + i + " entries, until ts: " + temp.getKey().getWriteTime() + ".");
- }
- boolean res = temp.getKey().getWriteTime() <= endTime;
- if (!res) {
- LOG.info("Reached ts: " + temp.getKey().getWriteTime() + " ignoring the rest of the file.");
- }
- return res;
}
@Override
@@ -235,6 +270,7 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
List<InputSplit> getSplits(final JobContext context, final String startKey, final String endKey)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
+ boolean ignoreMissing = conf.getBoolean(WALPlayer.IGNORE_MISSING_FILES, false);
Path[] inputPaths = getInputPaths(conf);
long startTime = conf.getLong(startKey, Long.MIN_VALUE);
long endTime = conf.getLong(endKey, Long.MAX_VALUE);
@@ -242,8 +278,16 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
List<FileStatus> allFiles = new ArrayList<FileStatus>();
for(Path inputPath: inputPaths){
FileSystem fs = inputPath.getFileSystem(conf);
- List<FileStatus> files = getFiles(fs, inputPath, startTime, endTime);
- allFiles.addAll(files);
+ try {
+ List<FileStatus> files = getFiles(fs, inputPath, startTime, endTime);
+ allFiles.addAll(files);
+ } catch (FileNotFoundException e) {
+ if (ignoreMissing) {
+ LOG.warn("File "+ inputPath +" is missing. Skipping it.");
+ continue;
+ }
+ throw e;
+ }
}
List<InputSplit> splits = new ArrayList<InputSplit>(allFiles.size());
for (FileStatus file : allFiles) {
@@ -253,8 +297,9 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
}
private Path[] getInputPaths(Configuration conf) {
- String inpDirs = conf.get("mapreduce.input.fileinputformat.inputdir");
- return StringUtils.stringToPath(inpDirs.split(","));
+ String inpDirs = conf.get(FileInputFormat.INPUT_DIR);
+ return StringUtils.stringToPath(
+ inpDirs.split(conf.get(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ",")));
}
private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime)
http://git-wip-us.apache.org/repos/asf/hbase/blob/910b6808/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
index d16dcf5..d15ffcf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
@@ -58,7 +58,7 @@ import org.apache.hadoop.util.ToolRunner;
/**
* A tool to replay WAL files as a M/R job.
* The WAL can be replayed for a set of tables or all tables,
- * and a timerange can be provided (in milliseconds).
+ * and a time range can be provided (in milliseconds).
* The WAL is filtered to the passed set of tables and the output
* can optionally be mapped to another set of tables.
*
@@ -73,6 +73,9 @@ public class WALPlayer extends Configured implements Tool {
public final static String BULK_OUTPUT_CONF_KEY = "wal.bulk.output";
public final static String TABLES_KEY = "wal.input.tables";
public final static String TABLE_MAP_KEY = "wal.input.tablesmap";
+ public final static String INPUT_FILES_SEPARATOR_KEY = "wal.input.separator";
+ public final static String IGNORE_MISSING_FILES = "wal.input.ignore.missing.files";
+
// This relies on Hadoop Configuration to handle warning about deprecated configs and
// to set the correct non-deprecated configs when an old one shows up.
@@ -128,7 +131,9 @@ public class WALPlayer extends Configured implements Tool {
throw new IOException("Exactly one table must be specified for bulk HFile case.");
}
table = Bytes.toBytes(tables[0]);
+
}
+
}
/**
@@ -280,11 +285,10 @@ public class WALPlayer extends Configured implements Tool {
}
conf.setStrings(TABLES_KEY, tables);
conf.setStrings(TABLE_MAP_KEY, tableMap);
+ conf.set(FileInputFormat.INPUT_DIR, inputDirs);
Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + System.currentTimeMillis()));
job.setJarByClass(WALPlayer.class);
- FileInputFormat.addInputPaths(job, inputDirs);
-
job.setInputFormatClass(WALInputFormat.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
http://git-wip-us.apache.org/repos/asf/hbase/blob/910b6808/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
index bf14933..28b7fda 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.wal;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -36,6 +37,9 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
import com.google.common.annotations.VisibleForTesting;
@@ -374,6 +378,103 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
}
/**
+ * Get the archived WAL file path
+ * @param path - active WAL file path
+ * @param conf - configuration
+ * @return archived path if exists, path - otherwise
+ * @throws IOException exception
+ */
+ public static Path getArchivedLogPath(Path path, Configuration conf) throws IOException {
+ Path rootDir = FSUtils.getRootDir(conf);
+ Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+ Path archivedLogLocation = new Path(oldLogDir, path.getName());
+ final FileSystem fs = FSUtils.getCurrentFileSystem(conf);
+
+ if (fs.exists(archivedLogLocation)) {
+ LOG.info("Log " + path + " was moved to " + archivedLogLocation);
+ return archivedLogLocation;
+ } else {
+ LOG.error("Couldn't locate log: " + path);
+ return path;
+ }
+ }
+
+ /**
+ * Opens WAL reader with retries and
+ * additional exception handling
+ * @param path path to WAL file
+ * @param conf configuration
+ * @return WAL Reader instance
+ * @throws IOException
+ */
+ public static org.apache.hadoop.hbase.wal.WAL.Reader
+ openReader(Path path, Configuration conf)
+ throws IOException
+
+ {
+ long retryInterval = 2000; // 2 sec
+ int maxAttempts = 30;
+ int attempt = 0;
+ Exception ee = null;
+ org.apache.hadoop.hbase.wal.WAL.Reader reader = null;
+ while (reader == null && attempt++ < maxAttempts) {
+ try {
+ // Detect if this is a new file, if so get a new reader else
+ // reset the current reader so that we see the new data
+ reader = WALFactory.createReader(path.getFileSystem(conf), path, conf);
+ return reader;
+ } catch (FileNotFoundException fnfe) {
+ // If the log was archived, continue reading from there
+ Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(path, conf);
+ if (path != archivedLog) {
+ return openReader(archivedLog, conf);
+ } else {
+ throw fnfe;
+ }
+ } catch (LeaseNotRecoveredException lnre) {
+ // HBASE-15019 the WAL was not closed due to some hiccup.
+ LOG.warn("Try to recover the WAL lease " + path, lnre);
+ recoverLease(conf, path);
+ reader = null;
+ ee = lnre;
+ } catch (NullPointerException npe) {
+ // Workaround for race condition in HDFS-4380
+ // which throws a NPE if we open a file before any data node has the most recent block
+ // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
+ LOG.warn("Got NPE opening reader, will retry.");
+ reader = null;
+ ee = npe;
+ }
+ if (reader == null) {
+ // sleep before next attempt
+ try {
+ Thread.sleep(retryInterval);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ throw new IOException("Could not open reader", ee);
+ }
+
+ // For HBASE-15019
+ private static void recoverLease(final Configuration conf, final Path path) {
+ try {
+ final FileSystem dfs = FSUtils.getCurrentFileSystem(conf);
+ FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
+ fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
+ @Override
+ public boolean progress() {
+ LOG.debug("Still trying to recover WAL lease: " + path);
+ return true;
+ }
+ });
+ } catch (IOException e) {
+ LOG.warn("unable to recover lease for WAL: " + path, e);
+ }
+ }
+
+
+ /**
* Get prefix of the log from its name, assuming WAL name in format of
* log_prefix.filenumber.log_suffix
* @param name Name of the WAL to parse