You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by el...@apache.org on 2018/12/05 20:39:26 UTC
hbase git commit: HBASE-21544 Backport HBASE-20734 Colocate recovered
edits directory with hbase.wal.dir
Repository: hbase
Updated Branches:
refs/heads/branch-2.0 8e36aae9d -> 361dea85c
HBASE-21544 Backport HBASE-20734 Colocate recovered edits directory with hbase.wal.dir
JE: Fairly direct backport from >=branch-2.1 to solve an issue where
an over-aggressive check for hflush() breaks Azure-based FileSystems.
Amending-Author: Reid Chan <re...@apache.org>
Signed-off-by: Reid Chan <re...@apache.org>
Signed-off-by: Josh Elser <el...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/361dea85
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/361dea85
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/361dea85
Branch: refs/heads/branch-2.0
Commit: 361dea85c90229d3c2752a814bfc38073f14a2c9
Parents: 8e36aae
Author: Zach York <zy...@apache.org>
Authored: Wed Jun 27 16:18:53 2018 -0700
Committer: Josh Elser <el...@apache.org>
Committed: Wed Dec 5 15:06:03 2018 -0500
----------------------------------------------------------------------
.../apache/hadoop/hbase/util/CommonFSUtils.java | 28 +++
.../assignment/MergeTableRegionsProcedure.java | 8 +-
.../assignment/SplitTableRegionProcedure.java | 10 +-
.../AbstractStateMachineTableProcedure.java | 6 +
.../hadoop/hbase/regionserver/HRegion.java | 159 ++++++++++-----
.../apache/hadoop/hbase/wal/WALSplitter.java | 198 +++++++++----------
.../hadoop/hbase/master/AbstractTestDLS.java | 6 +-
.../hadoop/hbase/regionserver/TestHRegion.java | 8 +-
.../regionserver/wal/AbstractTestWALReplay.java | 8 +-
.../hbase/wal/TestReadWriteSeqIdFiles.java | 18 +-
.../apache/hadoop/hbase/wal/TestWALFactory.java | 2 +-
.../apache/hadoop/hbase/wal/TestWALSplit.java | 58 +++---
12 files changed, 304 insertions(+), 205 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/361dea85/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java
index a34048a..a08f9f2 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java
@@ -417,6 +417,34 @@ public abstract class CommonFSUtils {
}
/**
+ * Returns the WAL region directory based on the given table name and region name
+ * @param conf configuration to determine WALRootDir
+ * @param tableName Table that the region is under
+ * @param encodedRegionName Region name used for creating the final region directory
+ * @return the region directory used to store WALs under the WALRootDir
+ * @throws IOException if there is an exception determining the WALRootDir
+ */
+ public static Path getWALRegionDir(final Configuration conf,
+ final TableName tableName, final String encodedRegionName)
+ throws IOException {
+ return new Path(getWALTableDir(conf, tableName),
+ encodedRegionName);
+ }
+
+ /**
+ * Returns the Table directory under the WALRootDir for the specified table name
+ * @param conf configuration used to get the WALRootDir
+ * @param tableName Table to get the directory for
+ * @return a path to the WAL table directory for the specified table
+ * @throws IOException if there is an exception determining the WALRootDir
+ */
+ public static Path getWALTableDir(final Configuration conf, final TableName tableName)
+ throws IOException {
+ return new Path(new Path(getWALRootDir(conf), tableName.getNamespaceAsString()),
+ tableName.getQualifierAsString());
+ }
+
+ /**
* Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under
* path rootdir
*
http://git-wip-us.apache.org/repos/asf/hbase/blob/361dea85/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
index accc051..a3ec9cc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
@@ -824,14 +824,16 @@ public class MergeTableRegionsProcedure
}
private void writeMaxSequenceIdFile(MasterProcedureEnv env) throws IOException {
- FileSystem fs = env.getMasterServices().getMasterFileSystem().getFileSystem();
+ FileSystem walFS = env.getMasterServices().getMasterWalManager().getFileSystem();
long maxSequenceId = -1L;
for (RegionInfo region : regionsToMerge) {
maxSequenceId =
- Math.max(maxSequenceId, WALSplitter.getMaxRegionSequenceId(fs, getRegionDir(env, region)));
+ Math.max(maxSequenceId, WALSplitter.getMaxRegionSequenceId(
+ walFS, getWALRegionDir(env, region)));
}
if (maxSequenceId > 0) {
- WALSplitter.writeRegionSequenceIdFile(fs, getRegionDir(env, mergedRegion), maxSequenceId);
+ WALSplitter.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, mergedRegion),
+ maxSequenceId);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/361dea85/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
index ef732fd..3b81d7a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
@@ -884,12 +884,14 @@ public class SplitTableRegionProcedure
}
private void writeMaxSequenceIdFile(MasterProcedureEnv env) throws IOException {
- FileSystem fs = env.getMasterServices().getMasterFileSystem().getFileSystem();
+ FileSystem walFS = env.getMasterServices().getMasterWalManager().getFileSystem();
long maxSequenceId =
- WALSplitter.getMaxRegionSequenceId(fs, getRegionDir(env, getParentRegion()));
+ WALSplitter.getMaxRegionSequenceId(walFS, getWALRegionDir(env, getParentRegion()));
if (maxSequenceId > 0) {
- WALSplitter.writeRegionSequenceIdFile(fs, getRegionDir(env, daughter_1_RI), maxSequenceId);
- WALSplitter.writeRegionSequenceIdFile(fs, getRegionDir(env, daughter_2_RI), maxSequenceId);
+ WALSplitter.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, daughter_1_RI),
+ maxSequenceId);
+ WALSplitter.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, daughter_2_RI),
+ maxSequenceId);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/361dea85/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java
index 8c13ef4..fa77fda 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java
@@ -139,6 +139,12 @@ public abstract class AbstractStateMachineTableProcedure<TState>
return new Path(tableDir, ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName());
}
+ protected final Path getWALRegionDir(MasterProcedureEnv env, RegionInfo region)
+ throws IOException {
+ return FSUtils.getWALRegionDir(env.getMasterConfiguration(),
+ region.getTable(), region.getEncodedName());
+ }
+
/**
* Check that cluster is up and master is running. Check table is modifiable.
* If <code>enabled</code>, check table is enabled else check it is disabled.
http://git-wip-us.apache.org/repos/asf/hbase/blob/361dea85/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 298e22a..6d7145a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -326,6 +326,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private final int rowLockWaitDuration;
static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
+ private Path regionDir;
+ private FileSystem walFS;
+
// The internal wait duration to acquire a lock before read/update
// from the region. It is not per row. The purpose of this wait time
// is to avoid waiting a long time while the region is busy, so that
@@ -927,7 +930,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
stores.forEach(HStore::startReplayingFromWAL);
// Recover any edits if available.
maxSeqId = Math.max(maxSeqId,
- replayRecoveredEditsIfAny(this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
+ replayRecoveredEditsIfAny(maxSeqIdInStores, reporter, status));
// Make sure mvcc is up to max.
this.mvcc.advanceTo(maxSeqId);
} finally {
@@ -970,14 +973,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Use maximum of log sequenceid or that which was found in stores
// (particularly if no recovered edits, seqid will be -1).
long maxSeqIdFromFile =
- WALSplitter.getMaxRegionSequenceId(fs.getFileSystem(), fs.getRegionDir());
+ WALSplitter.getMaxRegionSequenceId(getWalFileSystem(), getWALRegionDir());
long nextSeqId = Math.max(maxSeqId, maxSeqIdFromFile) + 1;
// The openSeqNum will always be increase even for read only region, as we rely on it to
// determine whether a region has been successfully reopend, so here we always need to update
// the max sequence id file.
if (RegionReplicaUtil.isDefaultReplica(getRegionInfo())) {
LOG.debug("writing seq id for {}", this.getRegionInfo().getEncodedName());
- WALSplitter.writeRegionSequenceIdFile(fs.getFileSystem(), fs.getRegionDir(), nextSeqId);
+ WALSplitter.writeRegionSequenceIdFile(fs.getFileSystem(), getWALRegionDir(), nextSeqId);
+ //WALSplitter.writeRegionSequenceIdFile(getWalFileSystem(), getWALRegionDir(), nextSeqId - 1);
}
LOG.info("Opened {}; next sequenceid={}", this.getRegionInfo().getShortNameToLog(), nextSeqId);
@@ -1123,11 +1127,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
WALUtil.writeRegionEventMarker(wal, getReplicationScope(), getRegionInfo(), regionEventDesc,
mvcc);
- // Store SeqId in HDFS when a region closes
+ // Store SeqId in WAL FileSystem when a region closes
// checking region folder exists is due to many tests which delete the table folder while a
// table is still online
- if (this.fs.getFileSystem().exists(this.fs.getRegionDir())) {
- WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs.getRegionDir(),
+ if (getWalFileSystem().exists(getWALRegionDir())) {
+ WALSplitter.writeRegionSequenceIdFile(getWalFileSystem(), getWALRegionDir(),
mvcc.getReadPoint());
}
}
@@ -1846,6 +1850,33 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return this.fs;
}
+ /** @return the WAL {@link HRegionFileSystem} used by this region */
+ HRegionFileSystem getRegionWALFileSystem() throws IOException {
+ return new HRegionFileSystem(conf, getWalFileSystem(),
+ FSUtils.getWALTableDir(conf, htableDescriptor.getTableName()), fs.getRegionInfo());
+ }
+
+ /** @return the WAL {@link FileSystem} being used by this region */
+ FileSystem getWalFileSystem() throws IOException {
+ if (walFS == null) {
+ walFS = FSUtils.getWALFileSystem(conf);
+ }
+ return walFS;
+ }
+
+ /**
+ * @return the Region directory under WALRootDirectory
+ * @throws IOException if there is an error getting WALRootDir
+ */
+ @VisibleForTesting
+ public Path getWALRegionDir() throws IOException {
+ if (regionDir == null) {
+ regionDir = FSUtils.getWALRegionDir(conf, getRegionInfo().getTable(),
+ getRegionInfo().getEncodedName());
+ }
+ return regionDir;
+ }
+
@Override
public long getEarliestFlushTimeForAllStores() {
return Collections.min(lastStoreFlushTimeMap.values());
@@ -4381,8 +4412,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
* @throws IOException
*/
- protected long replayRecoveredEditsIfAny(final Path regiondir,
- Map<byte[], Long> maxSeqIdInStores,
+ protected long replayRecoveredEditsIfAny(Map<byte[], Long> maxSeqIdInStores,
final CancelableProgressable reporter, final MonitoredTask status)
throws IOException {
long minSeqIdForTheRegion = -1;
@@ -4393,14 +4423,75 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
long seqid = minSeqIdForTheRegion;
- FileSystem fs = this.fs.getFileSystem();
- NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir);
+ FileSystem walFS = getWalFileSystem();
+ FileSystem rootFS = getFilesystem();
+ Path regionDir = getWALRegionDir();
+ Path defaultRegionDir = getRegionDir(FSUtils.getRootDir(conf), getRegionInfo());
+
+ // This is to ensure backwards compatability with HBASE-20723 where recovered edits can appear
+ // under the root dir even if walDir is set.
+ NavigableSet<Path> filesUnderRootDir = null;
+ if (!regionDir.equals(defaultRegionDir)) {
+ filesUnderRootDir =
+ WALSplitter.getSplitEditFilesSorted(rootFS, defaultRegionDir);
+ seqid = Math.max(seqid,
+ replayRecoveredEditsForPaths(minSeqIdForTheRegion, rootFS, filesUnderRootDir, reporter,
+ defaultRegionDir));
+ }
+
+ NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(walFS, regionDir);
+ seqid = Math.max(seqid, replayRecoveredEditsForPaths(minSeqIdForTheRegion, walFS,
+ files, reporter, regionDir));
+
+ if (seqid > minSeqIdForTheRegion) {
+ // Then we added some edits to memory. Flush and cleanup split edit files.
+ internalFlushcache(null, seqid, stores.values(), status, false, FlushLifeCycleTracker.DUMMY);
+ }
+ // Now delete the content of recovered edits. We're done w/ them.
+ if (files.size() > 0 && this.conf.getBoolean("hbase.region.archive.recovered.edits", false)) {
+ // For debugging data loss issues!
+ // If this flag is set, make use of the hfile archiving by making recovered.edits a fake
+ // column family. Have to fake out file type too by casting our recovered.edits as storefiles
+ String fakeFamilyName = WALSplitter.getRegionDirRecoveredEditsDir(regionDir).getName();
+ Set<HStoreFile> fakeStoreFiles = new HashSet<>(files.size());
+ for (Path file: files) {
+ fakeStoreFiles.add(
+ new HStoreFile(walFS, file, this.conf, null, null, true));
+ }
+ getRegionWALFileSystem().removeStoreFiles(fakeFamilyName, fakeStoreFiles);
+ } else {
+ if (filesUnderRootDir != null) {
+ for (Path file : filesUnderRootDir) {
+ if (!rootFS.delete(file, false)) {
+ LOG.error("Failed delete of {} from under the root directory.", file);
+ } else {
+ LOG.debug("Deleted recovered.edits under root directory. file=" + file);
+ }
+ }
+ }
+ for (Path file: files) {
+ if (!walFS.delete(file, false)) {
+ LOG.error("Failed delete of " + file);
+ } else {
+ LOG.debug("Deleted recovered.edits file=" + file);
+ }
+ }
+ }
+ return seqid;
+ }
+
+ private long replayRecoveredEditsForPaths(long minSeqIdForTheRegion, FileSystem fs,
+ final NavigableSet<Path> files, final CancelableProgressable reporter, final Path regionDir)
+ throws IOException {
+ long seqid = minSeqIdForTheRegion;
if (LOG.isDebugEnabled()) {
LOG.debug("Found " + (files == null ? 0 : files.size())
- + " recovered edits file(s) under " + regiondir);
+ + " recovered edits file(s) under " + regionDir);
}
- if (files == null || files.isEmpty()) return seqid;
+ if (files == null || files.isEmpty()) {
+ return minSeqIdForTheRegion;
+ }
for (Path edits: files) {
if (edits == null || !fs.exists(edits)) {
@@ -4415,8 +4506,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (maxSeqId <= minSeqIdForTheRegion) {
if (LOG.isDebugEnabled()) {
String msg = "Maximum sequenceid for this wal is " + maxSeqId
- + " and minimum sequenceid for the region is " + minSeqIdForTheRegion
- + ", skipped the whole file, path=" + edits;
+ + " and minimum sequenceid for the region is " + minSeqIdForTheRegion
+ + ", skipped the whole file, path=" + edits;
LOG.debug(msg);
}
continue;
@@ -4425,7 +4516,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
try {
// replay the edits. Replay can return -1 if everything is skipped, only update
// if seqId is greater
- seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter));
+ seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter, fs));
} catch (IOException e) {
boolean skipErrors = conf.getBoolean(
HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS,
@@ -4435,10 +4526,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (conf.get("hbase.skip.errors") != null) {
LOG.warn(
"The property 'hbase.skip.errors' has been deprecated. Please use " +
- HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead.");
+ HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead.");
}
if (skipErrors) {
- Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
+ Path p = WALSplitter.moveAsideBadEditsFile(walFS, edits);
LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
+ "=true so continuing. Renamed " + edits +
" as " + p, e);
@@ -4447,31 +4538,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
}
- if (seqid > minSeqIdForTheRegion) {
- // Then we added some edits to memory. Flush and cleanup split edit files.
- internalFlushcache(null, seqid, stores.values(), status, false, FlushLifeCycleTracker.DUMMY);
- }
- // Now delete the content of recovered edits. We're done w/ them.
- if (files.size() > 0 && this.conf.getBoolean("hbase.region.archive.recovered.edits", false)) {
- // For debugging data loss issues!
- // If this flag is set, make use of the hfile archiving by making recovered.edits a fake
- // column family. Have to fake out file type too by casting our recovered.edits as storefiles
- String fakeFamilyName = WALSplitter.getRegionDirRecoveredEditsDir(regiondir).getName();
- Set<HStoreFile> fakeStoreFiles = new HashSet<>(files.size());
- for (Path file: files) {
- fakeStoreFiles.add(
- new HStoreFile(getRegionFileSystem().getFileSystem(), file, this.conf, null, null, true));
- }
- getRegionFileSystem().removeStoreFiles(fakeFamilyName, fakeStoreFiles);
- } else {
- for (Path file: files) {
- if (!fs.delete(file, false)) {
- LOG.error("Failed delete of " + file);
- } else {
- LOG.debug("Deleted recovered.edits file=" + file);
- }
- }
- }
return seqid;
}
@@ -4485,12 +4551,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @throws IOException
*/
private long replayRecoveredEdits(final Path edits,
- Map<byte[], Long> maxSeqIdInStores, final CancelableProgressable reporter)
+ Map<byte[], Long> maxSeqIdInStores, final CancelableProgressable reporter, FileSystem fs)
throws IOException {
String msg = "Replaying edits from " + edits;
LOG.info(msg);
MonitoredTask status = TaskMonitor.get().createStatus(msg);
- FileSystem fs = this.fs.getFileSystem();
status.setStatus("Opening recovered edits");
WAL.Reader reader = null;
@@ -4644,7 +4709,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
coprocessorHost.postReplayWALs(this.getRegionInfo(), edits);
}
} catch (EOFException eof) {
- Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
+ Path p = WALSplitter.moveAsideBadEditsFile(walFS, edits);
msg = "EnLongAddered EOF. Most likely due to Master failure during " +
"wal splitting, so we have this data in another edit. " +
"Continuing, but renaming " + edits + " as " + p;
@@ -4654,7 +4719,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// If the IOE resulted from bad file format,
// then this problem is idempotent and retrying won't help
if (ioe.getCause() instanceof ParseException) {
- Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
+ Path p = WALSplitter.moveAsideBadEditsFile(walFS, edits);
msg = "File corruption enLongAddered! " +
"Continuing, but renaming " + edits + " as " + p;
LOG.warn(msg, ioe);
@@ -7938,7 +8003,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT +
ClassSize.ARRAY +
- 50 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
+ 53 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
(14 * Bytes.SIZEOF_LONG) +
3 * Bytes.SIZEOF_BOOLEAN);
http://git-wip-us.apache.org/repos/asf/hbase/blob/361dea85/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index 5689a35..9ed7f6c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -115,8 +115,8 @@ public class WALSplitter {
public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false;
// Parameters for split process
- protected final Path rootDir;
- protected final FileSystem fs;
+ protected final Path walDir;
+ protected final FileSystem walFS;
protected final Configuration conf;
// Major subcomponents of the split process.
@@ -148,15 +148,15 @@ public class WALSplitter {
@VisibleForTesting
- WALSplitter(final WALFactory factory, Configuration conf, Path rootDir,
- FileSystem fs, LastSequenceId idChecker,
+ WALSplitter(final WALFactory factory, Configuration conf, Path walDir,
+ FileSystem walFS, LastSequenceId idChecker,
SplitLogWorkerCoordination splitLogWorkerCoordination) {
this.conf = HBaseConfiguration.create(conf);
String codecClassName = conf
.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
- this.rootDir = rootDir;
- this.fs = fs;
+ this.walDir = walDir;
+ this.walFS = walFS;
this.sequenceIdChecker = idChecker;
this.splitLogWorkerCoordination = splitLogWorkerCoordination;
@@ -186,11 +186,11 @@ public class WALSplitter {
* <p>
* @return false if it is interrupted by the progress-able.
*/
- public static boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs,
+ public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS,
Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
SplitLogWorkerCoordination splitLogWorkerCoordination, final WALFactory factory)
throws IOException {
- WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, idChecker,
+ WALSplitter s = new WALSplitter(factory, conf, walDir, walFS, idChecker,
splitLogWorkerCoordination);
return s.splitLogFile(logfile, reporter);
}
@@ -201,13 +201,13 @@ public class WALSplitter {
// which uses this method to do log splitting.
@VisibleForTesting
public static List<Path> split(Path rootDir, Path logDir, Path oldLogDir,
- FileSystem fs, Configuration conf, final WALFactory factory) throws IOException {
+ FileSystem walFS, Configuration conf, final WALFactory factory) throws IOException {
final FileStatus[] logfiles = SplitLogManager.getFileList(conf,
Collections.singletonList(logDir), null);
List<Path> splits = new ArrayList<>();
if (ArrayUtils.isNotEmpty(logfiles)) {
for (FileStatus logfile: logfiles) {
- WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, null, null);
+ WALSplitter s = new WALSplitter(factory, conf, rootDir, walFS, null, null);
if (s.splitLogFile(logfile, null)) {
finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
if (s.outputSink.splits != null) {
@@ -216,7 +216,7 @@ public class WALSplitter {
}
}
}
- if (!fs.delete(logDir, true)) {
+ if (!walFS.delete(logDir, true)) {
throw new IOException("Unable to delete src dir: " + logDir);
}
return splits;
@@ -322,10 +322,10 @@ public class WALSplitter {
LOG.warn("Could not parse, corrupted WAL={}", logPath, e);
if (splitLogWorkerCoordination != null) {
// Some tests pass in a csm of null.
- splitLogWorkerCoordination.markCorrupted(rootDir, logfile.getPath().getName(), fs);
+ splitLogWorkerCoordination.markCorrupted(walDir, logfile.getPath().getName(), walFS);
} else {
// for tests only
- ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
+ ZKSplitLog.markCorrupted(walDir, logfile.getPath().getName(), walFS);
}
isCorrupted = true;
} catch (IOException e) {
@@ -373,31 +373,30 @@ public class WALSplitter {
*/
public static void finishSplitLogFile(String logfile,
Configuration conf) throws IOException {
- Path rootdir = FSUtils.getWALRootDir(conf);
- Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
+ Path walDir = FSUtils.getWALRootDir(conf);
+ Path oldLogDir = new Path(walDir, HConstants.HREGION_OLDLOGDIR_NAME);
Path logPath;
- if (FSUtils.isStartingWithPath(rootdir, logfile)) {
+ if (FSUtils.isStartingWithPath(walDir, logfile)) {
logPath = new Path(logfile);
} else {
- logPath = new Path(rootdir, logfile);
+ logPath = new Path(walDir, logfile);
}
- finishSplitLogFile(rootdir, oldLogDir, logPath, conf);
+ finishSplitLogFile(walDir, oldLogDir, logPath, conf);
}
- private static void finishSplitLogFile(Path rootdir, Path oldLogDir,
+ private static void finishSplitLogFile(Path walDir, Path oldLogDir,
Path logPath, Configuration conf) throws IOException {
List<Path> processedLogs = new ArrayList<>();
List<Path> corruptedLogs = new ArrayList<>();
- FileSystem fs;
- fs = rootdir.getFileSystem(conf);
- if (ZKSplitLog.isCorrupted(rootdir, logPath.getName(), fs)) {
+ FileSystem walFS = walDir.getFileSystem(conf);
+ if (ZKSplitLog.isCorrupted(walDir, logPath.getName(), walFS)) {
corruptedLogs.add(logPath);
} else {
processedLogs.add(logPath);
}
- archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf);
- Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, logPath.getName());
- fs.delete(stagingDir, true);
+ archiveLogs(corruptedLogs, processedLogs, oldLogDir, walFS, conf);
+ Path stagingDir = ZKSplitLog.getSplitLogDir(walDir, logPath.getName());
+ walFS.delete(stagingDir, true);
}
/**
@@ -408,30 +407,30 @@ public class WALSplitter {
* @param corruptedLogs
* @param processedLogs
* @param oldLogDir
- * @param fs
+ * @param walFS WAL FileSystem to archive files on.
* @param conf
* @throws IOException
*/
private static void archiveLogs(
final List<Path> corruptedLogs,
final List<Path> processedLogs, final Path oldLogDir,
- final FileSystem fs, final Configuration conf) throws IOException {
+ final FileSystem walFS, final Configuration conf) throws IOException {
final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME);
if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) {
LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to {}",
corruptDir);
}
- if (!fs.mkdirs(corruptDir)) {
+ if (!walFS.mkdirs(corruptDir)) {
LOG.info("Unable to mkdir {}", corruptDir);
}
- fs.mkdirs(oldLogDir);
+ walFS.mkdirs(oldLogDir);
// this method can get restarted or called multiple times for archiving
// the same log files.
for (Path corrupted : corruptedLogs) {
Path p = new Path(corruptDir, corrupted.getName());
- if (fs.exists(corrupted)) {
- if (!fs.rename(corrupted, p)) {
+ if (walFS.exists(corrupted)) {
+ if (!walFS.rename(corrupted, p)) {
LOG.warn("Unable to move corrupted log {} to {}", corrupted, p);
} else {
LOG.warn("Moved corrupted log {} to {}", corrupted, p);
@@ -441,8 +440,8 @@ public class WALSplitter {
for (Path p : processedLogs) {
Path newPath = AbstractFSWAL.getWALArchivePath(oldLogDir, p);
- if (fs.exists(p)) {
- if (!FSUtils.renameAndSetModifyTime(fs, p, newPath)) {
+ if (walFS.exists(p)) {
+ if (!FSUtils.renameAndSetModifyTime(walFS, p, newPath)) {
LOG.warn("Unable to move {} to {}", p, newPath);
} else {
LOG.info("Archived processed log {} to {}", p, newPath);
@@ -466,36 +465,31 @@ public class WALSplitter {
*/
@SuppressWarnings("deprecation")
@VisibleForTesting
- static Path getRegionSplitEditsPath(final FileSystem fs,
- final Entry logEntry, final Path rootDir, String fileNameBeingSplit)
- throws IOException {
- Path tableDir = FSUtils.getTableDir(rootDir, logEntry.getKey().getTableName());
+ static Path getRegionSplitEditsPath(final Entry logEntry, String fileNameBeingSplit,
+ Configuration conf) throws IOException {
+ FileSystem walFS = FSUtils.getWALFileSystem(conf);
+ Path tableDir = FSUtils.getWALTableDir(conf, logEntry.getKey().getTableName());
String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName());
- Path regiondir = HRegion.getRegionDir(tableDir, encodedRegionName);
- Path dir = getRegionDirRecoveredEditsDir(regiondir);
+ Path regionDir = HRegion.getRegionDir(tableDir, encodedRegionName);
+ Path dir = getRegionDirRecoveredEditsDir(regionDir);
- if (!fs.exists(regiondir)) {
- LOG.info("This region's directory does not exist: {}."
- + "It is very likely that it was already split so it is "
- + "safe to discard those edits.", regiondir);
- return null;
- }
- if (fs.exists(dir) && fs.isFile(dir)) {
+
+ if (walFS.exists(dir) && walFS.isFile(dir)) {
Path tmp = new Path("/tmp");
- if (!fs.exists(tmp)) {
- fs.mkdirs(tmp);
+ if (!walFS.exists(tmp)) {
+ walFS.mkdirs(tmp);
}
tmp = new Path(tmp,
HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName);
LOG.warn("Found existing old file: {}. It could be some "
+ "leftover of an old installation. It should be a folder instead. "
+ "So moving it to {}", dir, tmp);
- if (!fs.rename(dir, tmp)) {
+ if (!walFS.rename(dir, tmp)) {
LOG.warn("Failed to sideline old file {}", dir);
}
}
- if (!fs.exists(dir) && !fs.mkdirs(dir)) {
+ if (!walFS.exists(dir) && !walFS.mkdirs(dir)) {
LOG.warn("mkdir failed on {}", dir);
}
// Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now.
@@ -533,34 +527,34 @@ public class WALSplitter {
private static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp";
/**
- * @param regiondir
+ * @param regionDir
* This regions directory in the filesystem.
* @return The directory that holds recovered edits files for the region
- * <code>regiondir</code>
+ * <code>regionDir</code>
*/
- public static Path getRegionDirRecoveredEditsDir(final Path regiondir) {
- return new Path(regiondir, HConstants.RECOVERED_EDITS_DIR);
+ public static Path getRegionDirRecoveredEditsDir(final Path regionDir) {
+ return new Path(regionDir, HConstants.RECOVERED_EDITS_DIR);
}
/**
* Check whether there is recovered.edits in the region dir
- * @param fs FileSystem
+ * @param walFS FileSystem
* @param conf conf
* @param regionInfo the region to check
* @throws IOException IOException
* @return true if recovered.edits exist in the region dir
*/
- public static boolean hasRecoveredEdits(final FileSystem fs,
+ public static boolean hasRecoveredEdits(final FileSystem walFS,
final Configuration conf, final RegionInfo regionInfo) throws IOException {
// No recovered.edits for non default replica regions
if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
return false;
}
- Path rootDir = FSUtils.getRootDir(conf);
//Only default replica region can reach here, so we can use regioninfo
//directly without converting it to default replica's regioninfo.
- Path regionDir = HRegion.getRegionDir(rootDir, regionInfo);
- NavigableSet<Path> files = getSplitEditFilesSorted(fs, regionDir);
+ Path regionDir = FSUtils.getWALRegionDir(conf, regionInfo.getTable(),
+ regionInfo.getEncodedName());
+ NavigableSet<Path> files = getSplitEditFilesSorted(walFS, regionDir);
return files != null && !files.isEmpty();
}
@@ -569,19 +563,19 @@ public class WALSplitter {
* Returns sorted set of edit files made by splitter, excluding files
* with '.temp' suffix.
*
- * @param fs
- * @param regiondir
- * @return Files in passed <code>regiondir</code> as a sorted set.
+ * @param walFS WAL FileSystem used to retrieving split edits files.
+ * @param regionDir WAL region dir to look for recovered edits files under.
+ * @return Files in passed <code>regionDir</code> as a sorted set.
* @throws IOException
*/
- public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem fs,
- final Path regiondir) throws IOException {
+ public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem walFS,
+ final Path regionDir) throws IOException {
NavigableSet<Path> filesSorted = new TreeSet<>();
- Path editsdir = getRegionDirRecoveredEditsDir(regiondir);
- if (!fs.exists(editsdir)) {
+ Path editsdir = getRegionDirRecoveredEditsDir(regionDir);
+ if (!walFS.exists(editsdir)) {
return filesSorted;
}
- FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() {
+ FileStatus[] files = FSUtils.listStatus(walFS, editsdir, new PathFilter() {
@Override
public boolean accept(Path p) {
boolean result = false;
@@ -591,7 +585,7 @@ public class WALSplitter {
// In particular, on error, we'll move aside the bad edit file giving
// it a timestamp suffix. See moveAsideBadEditsFile.
Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName());
- result = fs.isFile(p) && m.matches();
+ result = walFS.isFile(p) && m.matches();
// Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX,
// because it means splitwal thread is writting this file.
if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) {
@@ -616,17 +610,17 @@ public class WALSplitter {
/**
* Move aside a bad edits file.
*
- * @param fs
+ * @param walFS WAL FileSystem used to rename bad edits file.
* @param edits
* Edits file to move aside.
* @return The name of the moved aside file.
* @throws IOException
*/
- public static Path moveAsideBadEditsFile(final FileSystem fs, final Path edits)
+ public static Path moveAsideBadEditsFile(final FileSystem walFS, final Path edits)
throws IOException {
Path moveAsideName = new Path(edits.getParent(), edits.getName() + "."
+ System.currentTimeMillis());
- if (!fs.rename(edits, moveAsideName)) {
+ if (!walFS.rename(edits, moveAsideName)) {
LOG.warn("Rename failed from {} to {}", edits, moveAsideName);
}
return moveAsideName;
@@ -645,12 +639,13 @@ public class WALSplitter {
|| file.getName().endsWith(OLD_SEQUENCE_ID_FILE_SUFFIX);
}
- private static FileStatus[] getSequenceIdFiles(FileSystem fs, Path regionDir) throws IOException {
+ private static FileStatus[] getSequenceIdFiles(FileSystem walFS, Path regionDir)
+ throws IOException {
// TODO: Why are we using a method in here as part of our normal region open where
// there is no splitting involved? Fix. St.Ack 01/20/2017.
Path editsDir = WALSplitter.getRegionDirRecoveredEditsDir(regionDir);
try {
- FileStatus[] files = fs.listStatus(editsDir, WALSplitter::isSequenceIdFile);
+ FileStatus[] files = walFS.listStatus(editsDir, WALSplitter::isSequenceIdFile);
return files != null ? files : new FileStatus[0];
} catch (FileNotFoundException e) {
return new FileStatus[0];
@@ -674,16 +669,16 @@ public class WALSplitter {
/**
* Get the max sequence id which is stored in the region directory. -1 if none.
*/
- public static long getMaxRegionSequenceId(FileSystem fs, Path regionDir) throws IOException {
- return getMaxSequenceId(getSequenceIdFiles(fs, regionDir));
+ public static long getMaxRegionSequenceId(FileSystem walFS, Path regionDir) throws IOException {
+ return getMaxSequenceId(getSequenceIdFiles(walFS, regionDir));
}
/**
* Create a file with name as region's max sequence id
*/
- public static void writeRegionSequenceIdFile(FileSystem fs, Path regionDir, long newMaxSeqId)
+ public static void writeRegionSequenceIdFile(FileSystem walFS, Path regionDir, long newMaxSeqId)
throws IOException {
- FileStatus[] files = getSequenceIdFiles(fs, regionDir);
+ FileStatus[] files = getSequenceIdFiles(walFS, regionDir);
long maxSeqId = getMaxSequenceId(files);
if (maxSeqId > newMaxSeqId) {
throw new IOException("The new max sequence id " + newMaxSeqId +
@@ -694,7 +689,7 @@ public class WALSplitter {
newMaxSeqId + SEQUENCE_ID_FILE_SUFFIX);
if (newMaxSeqId != maxSeqId) {
try {
- if (!fs.createNewFile(newSeqIdFile) && !fs.exists(newSeqIdFile)) {
+ if (!walFS.createNewFile(newSeqIdFile) && !walFS.exists(newSeqIdFile)) {
throw new IOException("Failed to create SeqId file:" + newSeqIdFile);
}
LOG.debug("Wrote file={}, newMaxSeqId={}, maxSeqId={}", newSeqIdFile, newMaxSeqId,
@@ -706,7 +701,7 @@ public class WALSplitter {
// remove old ones
for (FileStatus status : files) {
if (!newSeqIdFile.equals(status.getPath())) {
- fs.delete(status.getPath(), false);
+ walFS.delete(status.getPath(), false);
}
}
}
@@ -733,7 +728,7 @@ public class WALSplitter {
}
try {
- FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf, reporter);
+ FSUtils.getInstance(walFS, conf).recoverFileLease(walFS, path, conf, reporter);
try {
in = getReader(path, reporter);
} catch (EOFException e) {
@@ -800,7 +795,7 @@ public class WALSplitter {
*/
protected Writer createWriter(Path logfile)
throws IOException {
- return walFactory.createRecoveredEditsWriter(fs, logfile);
+ return walFactory.createRecoveredEditsWriter(walFS, logfile);
}
/**
@@ -808,7 +803,7 @@ public class WALSplitter {
* @return new Reader instance, caller should close
*/
protected Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException {
- return walFactory.createReader(fs, curLogFile, reporter);
+ return walFactory.createReader(walFS, curLogFile, reporter);
}
/**
@@ -1283,9 +1278,10 @@ public class WALSplitter {
}
// delete the one with fewer wal entries
- private void deleteOneWithFewerEntries(WriterAndPath wap, Path dst) throws IOException {
+ private void deleteOneWithFewerEntries(WriterAndPath wap, Path dst)
+ throws IOException {
long dstMinLogSeqNum = -1L;
- try (WAL.Reader reader = walFactory.createReader(fs, dst)) {
+ try (WAL.Reader reader = walFactory.createReader(walFS, dst)) {
WAL.Entry entry = reader.next();
if (entry != null) {
dstMinLogSeqNum = entry.getKey().getSequenceId();
@@ -1297,15 +1293,15 @@ public class WALSplitter {
if (wap.minLogSeqNum < dstMinLogSeqNum) {
LOG.warn("Found existing old edits file. It could be the result of a previous failed"
+ " split attempt or we have duplicated wal entries. Deleting " + dst + ", length="
- + fs.getFileStatus(dst).getLen());
- if (!fs.delete(dst, false)) {
+ + walFS.getFileStatus(dst).getLen());
+ if (!walFS.delete(dst, false)) {
LOG.warn("Failed deleting of old {}", dst);
throw new IOException("Failed deleting of old " + dst);
}
} else {
LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.p
- + ", length=" + fs.getFileStatus(wap.p).getLen());
- if (!fs.delete(wap.p, false)) {
+ + ", length=" + walFS.getFileStatus(wap.p).getLen());
+ if (!walFS.delete(wap.p, false)) {
LOG.warn("Failed deleting of {}", wap.p);
throw new IOException("Failed deleting of " + wap.p);
}
@@ -1389,9 +1385,7 @@ public class WALSplitter {
Path closeWriter(String encodedRegionName, WriterAndPath wap,
List<IOException> thrown) throws IOException{
- if (LOG.isTraceEnabled()) {
- LOG.trace("Closing " + wap.p);
- }
+ LOG.trace("Closing " + wap.p);
try {
wap.w.close();
} catch (IOException ioe) {
@@ -1406,7 +1400,7 @@ public class WALSplitter {
}
if (wap.editsWritten == 0) {
// just remove the empty recovered.edits file
- if (fs.exists(wap.p) && !fs.delete(wap.p, false)) {
+ if (walFS.exists(wap.p) && !walFS.delete(wap.p, false)) {
LOG.warn("Failed deleting empty " + wap.p);
throw new IOException("Failed deleting empty " + wap.p);
}
@@ -1416,14 +1410,14 @@ public class WALSplitter {
Path dst = getCompletedRecoveredEditsFilePath(wap.p,
regionMaximumEditLogSeqNum.get(encodedRegionName));
try {
- if (!dst.equals(wap.p) && fs.exists(dst)) {
+ if (!dst.equals(wap.p) && walFS.exists(dst)) {
deleteOneWithFewerEntries(wap, dst);
}
// Skip the unit tests which create a splitter that reads and
// writes the data without touching disk.
// TestHLogSplit#testThreading is an example.
- if (fs.exists(wap.p)) {
- if (!fs.rename(wap.p, dst)) {
+ if (walFS.exists(wap.p)) {
+ if (!walFS.rename(wap.p, dst)) {
throw new IOException("Failed renaming " + wap.p + " to " + dst);
}
LOG.info("Rename " + wap.p + " to " + dst);
@@ -1495,7 +1489,7 @@ public class WALSplitter {
if (blacklistedRegions.contains(region)) {
return null;
}
- ret = createWAP(region, entry, rootDir);
+ ret = createWAP(region, entry);
if (ret == null) {
blacklistedRegions.add(region);
return null;
@@ -1509,16 +1503,18 @@ public class WALSplitter {
/**
* @return a path with a write for that path. caller should close.
*/
- WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir) throws IOException {
- Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, fileBeingSplit.getPath().getName());
+ WriterAndPath createWAP(byte[] region, Entry entry) throws IOException {
+ Path regionedits = getRegionSplitEditsPath(entry,
+ fileBeingSplit.getPath().getName(), conf);
if (regionedits == null) {
return null;
}
- if (fs.exists(regionedits)) {
+ FileSystem walFs = FSUtils.getWALFileSystem(conf);
+ if (walFs.exists(regionedits)) {
LOG.warn("Found old edits file. It could be the "
+ "result of a previous failed split attempt. Deleting " + regionedits + ", length="
- + fs.getFileStatus(regionedits).getLen());
- if (!fs.delete(regionedits, false)) {
+ + walFs.getFileStatus(regionedits).getLen());
+ if (!walFs.delete(regionedits, false)) {
LOG.warn("Failed delete of old {}", regionedits);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/361dea85/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java
index bc4d32c..f36b38c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java
@@ -66,7 +66,6 @@ import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
-import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.Region;
@@ -221,10 +220,11 @@ public abstract class AbstractTestDLS {
int count = 0;
for (RegionInfo hri : regions) {
- Path tdir = FSUtils.getTableDir(rootdir, table);
+ Path tdir = FSUtils.getWALTableDir(conf, table);
@SuppressWarnings("deprecation")
Path editsdir = WALSplitter
- .getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName()));
+ .getRegionDirRecoveredEditsDir(FSUtils.getWALRegionDir(conf,
+ tableName, hri.getEncodedName()));
LOG.debug("checking edits dir " + editsdir);
FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/361dea85/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 34b7760..d127c7b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -704,7 +704,7 @@ public class TestHRegion {
for (HStore store : region.getStores()) {
maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId - 1);
}
- long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status);
+ long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
assertEquals(maxSeqId, seqId);
region.getMVCC().advanceTo(seqId);
Get get = new Get(row);
@@ -756,7 +756,7 @@ public class TestHRegion {
for (HStore store : region.getStores()) {
maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), recoverSeqId - 1);
}
- long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status);
+ long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
assertEquals(maxSeqId, seqId);
region.getMVCC().advanceTo(seqId);
Get get = new Get(row);
@@ -801,7 +801,7 @@ public class TestHRegion {
for (HStore store : region.getStores()) {
maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId);
}
- long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, null);
+ long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, null);
assertEquals(minSeqId, seqId);
} finally {
HBaseTestingUtility.closeRegionAndWAL(this.region);
@@ -859,7 +859,7 @@ public class TestHRegion {
for (HStore store : region.getStores()) {
maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), recoverSeqId - 1);
}
- long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status);
+ long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
assertEquals(maxSeqId, seqId);
// assert that the files are flushed
http://git-wip-us.apache.org/repos/asf/hbase/blob/361dea85/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
index d20188a..aec001e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
@@ -869,7 +869,7 @@ public abstract class AbstractTestWALReplay {
final TableName tableName = TableName.valueOf(currentTest.getMethodName());
final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
final Path basedir =
- FSUtils.getTableDir(this.hbaseRootDir, tableName);
+ FSUtils.getWALTableDir(conf, tableName);
deleteDir(basedir);
final byte[] rowName = tableName.getName();
final int countPerFamily = 10;
@@ -902,7 +902,7 @@ public abstract class AbstractTestWALReplay {
WALSplitter.splitLogFile(hbaseRootDir, listStatus[0],
this.fs, this.conf, null, null, null, wals);
FileStatus[] listStatus1 = this.fs.listStatus(
- new Path(FSUtils.getTableDir(hbaseRootDir, tableName), new Path(hri.getEncodedName(),
+ new Path(FSUtils.getWALTableDir(conf, tableName), new Path(hri.getEncodedName(),
"recovered.edits")), new PathFilter() {
@Override
public boolean accept(Path p) {
@@ -929,13 +929,13 @@ public abstract class AbstractTestWALReplay {
public void testDatalossWhenInputError() throws Exception {
final TableName tableName = TableName.valueOf("testDatalossWhenInputError");
final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
- final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName);
+ final Path basedir = FSUtils.getWALTableDir(conf, tableName);
deleteDir(basedir);
final byte[] rowName = tableName.getName();
final int countPerFamily = 10;
final HTableDescriptor htd = createBasic1FamilyHTD(tableName);
HRegion region1 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
- Path regionDir = region1.getRegionFileSystem().getRegionDir();
+ Path regionDir = region1.getWALRegionDir();
HBaseTestingUtility.closeRegionAndWAL(region1);
WAL wal = createWAL(this.conf, hbaseRootDir, logName);
http://git-wip-us.apache.org/repos/asf/hbase/blob/361dea85/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestReadWriteSeqIdFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestReadWriteSeqIdFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestReadWriteSeqIdFiles.java
index 6e3aa10..8ae638c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestReadWriteSeqIdFiles.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestReadWriteSeqIdFiles.java
@@ -49,13 +49,13 @@ public class TestReadWriteSeqIdFiles {
private static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility();
- private static FileSystem FS;
+ private static FileSystem walFS;
private static Path REGION_DIR;
@BeforeClass
public static void setUp() throws IOException {
- FS = FileSystem.getLocal(UTIL.getConfiguration());
+ walFS = FileSystem.getLocal(UTIL.getConfiguration());
REGION_DIR = UTIL.getDataTestDir();
}
@@ -66,20 +66,20 @@ public class TestReadWriteSeqIdFiles {
@Test
public void test() throws IOException {
- WALSplitter.writeRegionSequenceIdFile(FS, REGION_DIR, 1000L);
- assertEquals(1000L, WALSplitter.getMaxRegionSequenceId(FS, REGION_DIR));
- WALSplitter.writeRegionSequenceIdFile(FS, REGION_DIR, 2000L);
- assertEquals(2000L, WALSplitter.getMaxRegionSequenceId(FS, REGION_DIR));
+ WALSplitter.writeRegionSequenceIdFile(walFS, REGION_DIR, 1000L);
+ assertEquals(1000L, WALSplitter.getMaxRegionSequenceId(walFS, REGION_DIR));
+ WALSplitter.writeRegionSequenceIdFile(walFS, REGION_DIR, 2000L);
+ assertEquals(2000L, WALSplitter.getMaxRegionSequenceId(walFS, REGION_DIR));
// can not write a sequence id which is smaller
try {
- WALSplitter.writeRegionSequenceIdFile(FS, REGION_DIR, 1500L);
+ WALSplitter.writeRegionSequenceIdFile(walFS, REGION_DIR, 1500L);
} catch (IOException e) {
// expected
LOG.info("Expected error", e);
}
Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(REGION_DIR);
- FileStatus[] files = FSUtils.listStatus(FS, editsdir, new PathFilter() {
+ FileStatus[] files = FSUtils.listStatus(walFS, editsdir, new PathFilter() {
@Override
public boolean accept(Path p) {
return WALSplitter.isSequenceIdFile(p);
@@ -89,7 +89,7 @@ public class TestReadWriteSeqIdFiles {
assertEquals(1, files.length);
// verify all seqId files aren't treated as recovered.edits files
- NavigableSet<Path> recoveredEdits = WALSplitter.getSplitEditFilesSorted(FS, REGION_DIR);
+ NavigableSet<Path> recoveredEdits = WALSplitter.getSplitEditFilesSorted(walFS, REGION_DIR);
assertEquals(0, recoveredEdits.size());
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/361dea85/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
index e8161f4..f96a1d6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
@@ -179,7 +179,7 @@ public class TestWALFactory {
final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
final int howmany = 3;
RegionInfo[] infos = new RegionInfo[3];
- Path tabledir = FSUtils.getTableDir(hbaseWALDir, tableName);
+ Path tabledir = FSUtils.getWALTableDir(conf, tableName);
fs.mkdirs(tabledir);
for (int i = 0; i < howmany; i++) {
infos[i] = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("" + i))
http://git-wip-us.apache.org/repos/asf/hbase/blob/361dea85/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
index f5800df..2036027 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
@@ -250,9 +250,9 @@ public class TestWALSplit {
}
LOG.debug(Objects.toString(ls));
LOG.info("Splitting WALs out from under zombie. Expecting " + numWriters + " files.");
- WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf2, wals);
+ WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf2, wals);
LOG.info("Finished splitting out from under zombie.");
- Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
+ Path[] logfiles = getLogForRegion(TABLE_NAME, region);
assertEquals("wrong number of split files for region", numWriters, logfiles.length);
int count = 0;
for (Path logfile: logfiles) {
@@ -390,8 +390,8 @@ public class TestWALSplit {
new Entry(new WALKeyImpl(encoded,
TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
new WALEdit());
- Path p = WALSplitter.getRegionSplitEditsPath(fs, entry, HBASEDIR,
- FILENAME_BEING_SPLIT);
+ Path p = WALSplitter.getRegionSplitEditsPath(entry,
+ FILENAME_BEING_SPLIT, conf);
String parentOfParent = p.getParent().getParent().getName();
assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
}
@@ -416,8 +416,8 @@ public class TestWALSplit {
assertEquals(HConstants.RECOVERED_EDITS_DIR, parent.getName());
fs.createNewFile(parent); // create a recovered.edits file
- Path p = WALSplitter.getRegionSplitEditsPath(fs, entry, HBASEDIR,
- FILENAME_BEING_SPLIT);
+ Path p = WALSplitter.getRegionSplitEditsPath(entry,
+ FILENAME_BEING_SPLIT, conf);
String parentOfParent = p.getParent().getParent().getName();
assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
WALFactory.createRecoveredEditsWriter(fs, p, conf).close();
@@ -437,9 +437,9 @@ public class TestWALSplit {
generateWALs(1, 10, -1, 0);
useDifferentDFSClient();
- WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
+ WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
- Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
+ Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
assertEquals(1, splitLog.length);
assertTrue("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
@@ -453,9 +453,9 @@ public class TestWALSplit {
generateWALs(1, 10, -1, 100);
useDifferentDFSClient();
- WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
+ WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
- Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
+ Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
assertEquals(1, splitLog.length);
assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
@@ -480,13 +480,13 @@ public class TestWALSplit {
writer.close();
useDifferentDFSClient();
- WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
+ WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
// original log should have 10 test edits, 10 region markers, 1 compaction marker
assertEquals(21, countWAL(originalLog));
- Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, hri.getEncodedName());
+ Path[] splitLog = getLogForRegion(TABLE_NAME, hri.getEncodedName());
assertEquals(1, splitLog.length);
assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
@@ -501,10 +501,10 @@ public class TestWALSplit {
private int splitAndCount(final int expectedFiles, final int expectedEntries)
throws IOException {
useDifferentDFSClient();
- WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
+ WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
int result = 0;
for (String region : REGIONS) {
- Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
+ Path[] logfiles = getLogForRegion(TABLE_NAME, region);
assertEquals(expectedFiles, logfiles.length);
int count = 0;
for (Path logfile: logfiles) {
@@ -637,7 +637,7 @@ public class TestWALSplit {
walDirContents.add(status.getPath().getName());
}
useDifferentDFSClient();
- WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
+ WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
return walDirContents;
} finally {
conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass,
@@ -678,9 +678,9 @@ public class TestWALSplit {
corruptWAL(c1, corruption, true);
useDifferentDFSClient();
- WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
+ WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
- Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
+ Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
assertEquals(1, splitLog.length);
int actualCount = 0;
@@ -714,7 +714,7 @@ public class TestWALSplit {
conf.setBoolean(HBASE_SKIP_ERRORS, false);
generateWALs(-1);
useDifferentDFSClient();
- WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
+ WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
FileStatus[] archivedLogs = fs.listStatus(OLDLOGDIR);
assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length);
}
@@ -730,7 +730,7 @@ public class TestWALSplit {
throws IOException {
generateWALs(-1);
useDifferentDFSClient();
- WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
+ WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
FileStatus [] statuses = null;
try {
statuses = fs.listStatus(WALDIR);
@@ -760,7 +760,7 @@ public class TestWALSplit {
try {
InstrumentedLogWriter.activateFailure = true;
- WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
+ WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
} catch (IOException e) {
assertTrue(e.getMessage().
contains("This exception is instrumented and should only be thrown for testing"));
@@ -781,7 +781,7 @@ public class TestWALSplit {
Path regiondir = new Path(TABLEDIR, region);
fs.delete(regiondir, true);
- WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
+ WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
assertFalse(fs.exists(regiondir));
}
@@ -858,7 +858,7 @@ public class TestWALSplit {
useDifferentDFSClient();
try {
- WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, spiedFs, conf, wals);
+ WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, spiedFs, conf, wals);
assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length);
assertFalse(fs.exists(WALDIR));
} catch (IOException e) {
@@ -1082,7 +1082,7 @@ public class TestWALSplit {
Path regiondir = new Path(TABLEDIR, REGION);
LOG.info("Region directory is" + regiondir);
fs.delete(regiondir, true);
- WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
+ WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
assertFalse(fs.exists(regiondir));
}
@@ -1095,7 +1095,7 @@ public class TestWALSplit {
injectEmptyFile(".empty", true);
useDifferentDFSClient();
- WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
+ WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
Path tdir = FSUtils.getTableDir(HBASEDIR, TABLE_NAME);
assertFalse(fs.exists(tdir));
@@ -1120,7 +1120,7 @@ public class TestWALSplit {
Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true);
useDifferentDFSClient();
- WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
+ WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME);
assertEquals(1, fs.listStatus(corruptDir).length);
@@ -1148,14 +1148,14 @@ public class TestWALSplit {
@Override
protected Writer createWriter(Path logfile)
throws IOException {
- Writer writer = wals.createRecoveredEditsWriter(this.fs, logfile);
+ Writer writer = wals.createRecoveredEditsWriter(this.walFS, logfile);
// After creating writer, simulate region's
// replayRecoveredEditsIfAny() which gets SplitEditFiles of this
// region and delete them, excluding files with '.temp' suffix.
NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir);
if (files != null && !files.isEmpty()) {
for (Path file : files) {
- if (!this.fs.delete(file, false)) {
+ if (!this.walFS.delete(file, false)) {
LOG.error("Failed delete of " + file);
} else {
LOG.debug("Deleted recovered.edits file=" + file);
@@ -1234,9 +1234,9 @@ public class TestWALSplit {
- private Path[] getLogForRegion(Path rootdir, TableName table, String region)
+ private Path[] getLogForRegion(TableName table, String region)
throws IOException {
- Path tdir = FSUtils.getTableDir(rootdir, table);
+ Path tdir = FSUtils.getWALTableDir(conf, table);
@SuppressWarnings("deprecation")
Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir,
Bytes.toString(Bytes.toBytes(region))));