You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jx...@apache.org on 2013/04/17 17:53:28 UTC
svn commit: r1468981 - in /hbase/branches/0.95/hbase-server/src:
main/java/org/apache/hadoop/hbase/regionserver/
main/java/org/apache/hadoop/hbase/regionserver/wal/
main/java/org/apache/hadoop/hbase/util/
test/java/org/apache/hadoop/hbase/master/ test/...
Author: jxiang
Date: Wed Apr 17 15:53:27 2013
New Revision: 1468981
URL: http://svn.apache.org/r1468981
Log:
HBASE-8321 Log split worker should heartbeat to avoid timeout when the hlog is under recovery
Modified:
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSMapRUtils.java
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java
hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java?rev=1468981&r1=1468980&r2=1468981&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java Wed Apr 17 15:53:27 2013
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.SplitLogT
import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -81,13 +82,16 @@ public class SplitLogWorker extends ZooK
private volatile boolean exitWorker;
private final Object grabTaskLock = new Object();
private boolean workerInGrabTask = false;
-
+ private final int report_period;
public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf,
ServerName serverName, TaskExecutor splitTaskExecutor) {
super(watcher);
this.serverName = serverName;
this.splitTaskExecutor = splitTaskExecutor;
+ report_period = conf.getInt("hbase.splitlog.report.period",
+ conf.getInt("hbase.splitlog.manager.timeout",
+ SplitLogManager.DEFAULT_TIMEOUT) / 2);
}
public SplitLogWorker(ZooKeeperWatcher watcher, final Configuration conf,
@@ -274,15 +278,22 @@ public class SplitLogWorker extends ZooK
status = splitTaskExecutor.exec(ZKSplitLog.getFileName(currentTask),
new CancelableProgressable() {
+ private long last_report_at = 0;
+
@Override
public boolean progress() {
- if (!attemptToOwnTask(false)) {
- LOG.warn("Failed to heartbeat the task" + currentTask);
- return false;
+ long t = EnvironmentEdgeManager.currentTimeMillis();
+ if ((t - last_report_at) > report_period) {
+ last_report_at = t;
+ if (!attemptToOwnTask(false)) {
+ LOG.warn("Failed to heartbeat the task" + currentTask);
+ return false;
+ }
}
return true;
}
});
+
switch (status) {
case DONE:
endTask(new SplitLogTask.Done(this.serverName), SplitLogCounters.tot_wkr_task_done);
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java?rev=1468981&r1=1468980&r2=1468981&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java Wed Apr 17 15:53:27 2013
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
public class HLogFactory {
@@ -70,6 +71,11 @@ public class HLogFactory {
logReaderClass = null;
}
+ public static HLog.Reader createReader(final FileSystem fs,
+ final Path path, Configuration conf) throws IOException {
+ return createReader(fs, path, conf, null);
+ }
+
/**
* Create a reader for the WAL. If you are reading from a file that's being written to
* and need to reopen it multiple times, use {@link HLog.Reader#reset()} instead of this method
@@ -77,8 +83,8 @@ public class HLogFactory {
* @return A WAL reader. Close when done with it.
* @throws IOException
*/
- public static HLog.Reader createReader(final FileSystem fs,
- final Path path, Configuration conf) throws IOException {
+ public static HLog.Reader createReader(final FileSystem fs, final Path path,
+ Configuration conf, CancelableProgressable reporter) throws IOException {
if (logReaderClass == null) {
logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
SequenceFileLogReader.class, Reader.class);
@@ -102,6 +108,9 @@ public class HLogFactory {
if (++nbAttempt == 1) {
LOG.warn("Lease should have recovered. This is not expected. Will retry", e);
}
+ if (reporter != null && !reporter.progress()) {
+ throw new InterruptedIOException("Operation is cancelled");
+ }
if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTimeMillis()) {
LOG.error("Can't open after " + nbAttempt + " attempts and "
+ (EnvironmentEdgeManager.currentTimeMillis() - startWaiting)
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1468981&r1=1468980&r2=1468981&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java Wed Apr 17 15:53:27 2013
@@ -55,7 +55,6 @@ import org.apache.hadoop.hbase.HTableDes
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.exceptions.OrphanHLogAfterSplitException;
import org.apache.hadoop.hbase.io.HeapSize;
-import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -113,9 +112,6 @@ public class HLogSplitter {
private MonitoredTask status;
- // Used in distributed log splitting
- private DistributedLogSplittingHelper distributedLogSplittingHelper = null;
-
// For checking the latest flushed sequence id
protected final LastSequenceId sequenceIdChecker;
@@ -263,10 +259,6 @@ public class HLogSplitter {
return outputSink.getOutputCounts();
}
- void setDistributedLogSplittingHelper(DistributedLogSplittingHelper helper) {
- this.distributedLogSplittingHelper = helper;
- }
-
/**
* Splits the HLog edits in the given list of logfiles (that are a mix of edits
* on multiple regions) by region and then splits them per region directories,
@@ -317,7 +309,7 @@ public class HLogSplitter {
//meta only. However, there is a sequence number that can be obtained
//only by parsing.. so we parse for all files currently
//TODO: optimize this part somehow
- in = getReader(fs, log, conf, skipErrors);
+ in = getReader(fs, log, conf, skipErrors, null);
if (in != null) {
parseHLog(in, logPath, entryBuffers, fs, conf, skipErrors);
}
@@ -420,53 +412,58 @@ public class HLogSplitter {
CancelableProgressable reporter) throws IOException {
boolean isCorrupted = false;
Preconditions.checkState(status == null);
- status = TaskMonitor.get().createStatus(
- "Splitting log file " + logfile.getPath() +
- "into a temporary staging area.");
boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors",
- HLog.SPLIT_SKIP_ERRORS_DEFAULT);
+ HLog.SPLIT_SKIP_ERRORS_DEFAULT);
int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
Path logPath = logfile.getPath();
- long logLength = logfile.getLen();
- LOG.info("Splitting hlog: " + logPath + ", length=" + logLength);
- status.setStatus("Opening log file");
- Reader in = null;
- try {
- in = getReader(fs, logfile, conf, skipErrors);
- } catch (CorruptedLogFileException e) {
- LOG.warn("Could not get reader, corrupted log file " + logPath, e);
- ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
- isCorrupted = true;
- }
- if (in == null) {
- status.markComplete("Was nothing to split in log file");
- LOG.warn("Nothing to split in log file " + logPath);
- return true;
- }
- this.setDistributedLogSplittingHelper(new DistributedLogSplittingHelper(reporter));
- if (!reportProgressIfIsDistributedLogSplitting()) {
- return false;
- }
+ boolean outputSinkStarted = false;
boolean progress_failed = false;
- int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
- int numOpenedFilesLastCheck = 0;
- outputSink.startWriterThreads();
- // Report progress every so many edits and/or files opened (opening a file
- // takes a bit of time).
- Map<byte[], Long> lastFlushedSequenceIds =
- new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
- Entry entry;
int editsCount = 0;
int editsSkipped = 0;
+
try {
+ status = TaskMonitor.get().createStatus(
+ "Splitting log file " + logfile.getPath() +
+ "into a temporary staging area.");
+ long logLength = logfile.getLen();
+ LOG.info("Splitting hlog: " + logPath + ", length=" + logLength);
+ status.setStatus("Opening log file");
+ if (reporter != null && !reporter.progress()) {
+ progress_failed = true;
+ return false;
+ }
+ Reader in = null;
+ try {
+ in = getReader(fs, logfile, conf, skipErrors, reporter);
+ } catch (CorruptedLogFileException e) {
+ LOG.warn("Could not get reader, corrupted log file " + logPath, e);
+ ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
+ isCorrupted = true;
+ }
+ if (in == null) {
+ status.markComplete("Was nothing to split in log file");
+ LOG.warn("Nothing to split in log file " + logPath);
+ return true;
+ }
+ int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
+ int numOpenedFilesLastCheck = 0;
+ outputSink.setReporter(reporter);
+ outputSink.startWriterThreads();
+ outputSinkStarted = true;
+ // Report progress every so many edits and/or files opened (opening a file
+ // takes a bit of time).
+ Map<byte[], Long> lastFlushedSequenceIds =
+ new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+ Entry entry;
+
while ((entry = getNextLogLine(in,logPath, skipErrors)) != null) {
byte[] region = entry.getKey().getEncodedRegionName();
Long lastFlushedSequenceId = -1l;
if (sequenceIdChecker != null) {
lastFlushedSequenceId = lastFlushedSequenceIds.get(region);
if (lastFlushedSequenceId == null) {
- lastFlushedSequenceId = sequenceIdChecker.getLastSequenceId(region);
- lastFlushedSequenceIds.put(region, lastFlushedSequenceId);
+ lastFlushedSequenceId = sequenceIdChecker.getLastSequenceId(region);
+ lastFlushedSequenceIds.put(region, lastFlushedSequenceId);
}
}
if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
@@ -482,7 +479,8 @@ public class HLogSplitter {
String countsStr = (editsCount - editsSkipped) +
" edits, skipped " + editsSkipped + " edits.";
status.setStatus("Split " + countsStr);
- if (!reportProgressIfIsDistributedLogSplitting()) {
+ if (reporter != null && !reporter.progress()) {
+ progress_failed = true;
return false;
}
}
@@ -500,12 +498,13 @@ public class HLogSplitter {
throw e;
} finally {
LOG.info("Finishing writing output logs and closing down.");
- progress_failed = outputSink.finishWritingAndClose() == null;
+ if (outputSinkStarted) {
+ progress_failed = outputSink.finishWritingAndClose() == null;
+ }
String msg = "Processed " + editsCount + " edits across "
- + outputSink.getOutputCounts().size() + " regions; log file="
- + logPath + " is corrupted = " + isCorrupted + " progress failed = "
- + progress_failed;
- ;
+ + outputSink.getOutputCounts().size() + " regions; log file="
+ + logPath + " is corrupted = " + isCorrupted + " progress failed = "
+ + progress_failed;
LOG.info(msg);
status.markComplete(msg);
}
@@ -620,6 +619,7 @@ public class HLogSplitter {
* @return Path to file into which to dump split log edits.
* @throws IOException
*/
+ @SuppressWarnings("deprecation")
static Path getRegionSplitEditsPath(final FileSystem fs,
final Entry logEntry, final Path rootDir, boolean isCreate)
throws IOException {
@@ -724,7 +724,7 @@ public class HLogSplitter {
* @throws CorruptedLogFileException
*/
protected Reader getReader(FileSystem fs, FileStatus file, Configuration conf,
- boolean skipErrors)
+ boolean skipErrors, CancelableProgressable reporter)
throws IOException, CorruptedLogFileException {
Path path = file.getPath();
long length = file.getLen();
@@ -739,9 +739,9 @@ public class HLogSplitter {
}
try {
- FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf);
+ FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf, reporter);
try {
- in = getReader(fs, path, conf);
+ in = getReader(fs, path, conf, reporter);
} catch (EOFException e) {
if (length <= 0) {
// TODO should we ignore an empty, not-last log file if skip.errors
@@ -757,8 +757,8 @@ public class HLogSplitter {
}
}
} catch (IOException e) {
- if (!skipErrors) {
- throw e;
+ if (!skipErrors || e instanceof InterruptedIOException) {
+ throw e; // Don't mark the file corrupted if interrupted, or not skipErrors
}
CorruptedLogFileException t =
new CorruptedLogFileException("skipErrors=true Could not open hlog " +
@@ -826,9 +826,9 @@ public class HLogSplitter {
/**
* Create a new {@link Reader} for reading logs to split.
*/
- protected Reader getReader(FileSystem fs, Path curLogFile, Configuration conf)
- throws IOException {
- return HLogFactory.createReader(fs, curLogFile, conf);
+ protected Reader getReader(FileSystem fs, Path curLogFile,
+ Configuration conf, CancelableProgressable reporter) throws IOException {
+ return HLogFactory.createReader(fs, curLogFile, conf, reporter);
}
/**
@@ -1108,56 +1108,6 @@ public class HLogSplitter {
return ret;
}
- /***
- * @return false if it is a distributed log splitting and it failed to report
- * progress
- */
- private boolean reportProgressIfIsDistributedLogSplitting() {
- if (this.distributedLogSplittingHelper != null) {
- return distributedLogSplittingHelper.reportProgress();
- } else {
- return true;
- }
- }
-
- /**
- * A class used in distributed log splitting
- *
- */
- class DistributedLogSplittingHelper {
- // Report progress, only used in distributed log splitting
- private final CancelableProgressable splitReporter;
- // How often to send a progress report (default 1/2 master timeout)
- private final int report_period;
- private long last_report_at = 0;
-
- public DistributedLogSplittingHelper(CancelableProgressable reporter) {
- this.splitReporter = reporter;
- report_period = conf.getInt("hbase.splitlog.report.period",
- conf.getInt("hbase.splitlog.manager.timeout",
- SplitLogManager.DEFAULT_TIMEOUT) / 2);
- }
-
- /***
- * @return false if reporter failed progressing
- */
- private boolean reportProgress() {
- if (splitReporter == null) {
- return true;
- } else {
- long t = EnvironmentEdgeManager.currentTimeMillis();
- if ((t - last_report_at) > report_period) {
- last_report_at = t;
- if (this.splitReporter.progress() == false) {
- LOG.warn("Failed: reporter.progress asked us to terminate");
- return false;
- }
- }
- return true;
- }
- }
- }
-
/**
* Class that manages the output streams from the log splitting process.
*/
@@ -1178,6 +1128,8 @@ public class HLogSplitter {
private final int numThreads;
+ private CancelableProgressable reporter = null;
+
public OutputSink() {
// More threads could potentially write faster at the expense
// of causing more disk seeks as the logs are split.
@@ -1188,6 +1140,10 @@ public class HLogSplitter {
"hbase.regionserver.hlog.splitlog.writer.threads", 3);
}
+ void setReporter(CancelableProgressable reporter) {
+ this.reporter = reporter;
+ }
+
/**
* Start the threads that will pump data from the entryBuffers
* to the output files.
@@ -1213,7 +1169,7 @@ public class HLogSplitter {
t.finish();
}
for (WriterThread t : writerThreads) {
- if (!progress_failed && !reportProgressIfIsDistributedLogSplitting()) {
+ if (!progress_failed && reporter != null && !reporter.progress()) {
progress_failed = true;
}
try {
@@ -1309,7 +1265,7 @@ public class HLogSplitter {
for (int i = 0, n = logWriters.size(); i < n; i++) {
Future<Void> future = completionService.take();
future.get();
- if (!progress_failed && !reportProgressIfIsDistributedLogSplitting()) {
+ if (!progress_failed && reporter != null && !reporter.progress()) {
progress_failed = true;
}
}
@@ -1437,8 +1393,6 @@ public class HLogSplitter {
}
}
-
-
/**
* Private data structure that wraps a Writer and its Path,
* also collecting statistics about the data written to this
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java?rev=1468981&r1=1468980&r2=1468981&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java Wed Apr 17 15:53:27 2013
@@ -38,15 +38,15 @@ import java.io.InterruptedIOException;
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-public class FSHDFSUtils extends FSUtils{
+public class FSHDFSUtils extends FSUtils {
private static final Log LOG = LogFactory.getLog(FSHDFSUtils.class);
/**
* Recover the lease from HDFS, retrying multiple times.
*/
@Override
- public void recoverFileLease(final FileSystem fs, final Path p, Configuration conf)
- throws IOException {
+ public void recoverFileLease(final FileSystem fs, final Path p,
+ Configuration conf, CancelableProgressable reporter) throws IOException {
// lease recovery not needed for local file system case.
if (!(fs instanceof DistributedFileSystem)) {
return;
@@ -81,6 +81,9 @@ public class FSHDFSUtils extends FSUtils
", retrying.", e);
}
if (!recovered) {
+ if (reporter != null && !reporter.progress()) {
+ throw new InterruptedIOException("Operation is cancelled");
+ }
// try at least twice.
if (nbAttempt > 2 && recoveryTimeout < EnvironmentEdgeManager.currentTimeMillis()) {
LOG.error("Can't recoverLease after " + nbAttempt + " attempts and " +
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSMapRUtils.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSMapRUtils.java?rev=1468981&r1=1468980&r2=1468981&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSMapRUtils.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSMapRUtils.java Wed Apr 17 15:53:27 2013
@@ -35,9 +35,9 @@ import org.apache.commons.logging.LogFac
public class FSMapRUtils extends FSUtils {
private static final Log LOG = LogFactory.getLog(FSMapRUtils.class);
- public void recoverFileLease(final FileSystem fs, final Path p,
- Configuration conf) throws IOException {
- LOG.info("Recovering file " + p.toString() +
+ public void recoverFileLease(final FileSystem fs, final Path p,
+ Configuration conf, CancelableProgressable reporter) throws IOException {
+ LOG.info("Recovering file " + p.toString() +
" by changing permission to readonly");
FsPermission roPerm = new FsPermission((short) 0444);
fs.setPermission(p, roPerm);
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java?rev=1468981&r1=1468980&r2=1468981&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java Wed Apr 17 15:53:27 2013
@@ -204,6 +204,7 @@ public abstract class FSUtils {
* @return output stream to the created file
* @throws IOException if the file cannot be created
*/
+ @SuppressWarnings("deprecation")
public static FSDataOutputStream create(FileSystem fs, Path path,
FsPermission perm, boolean overwrite) throws IOException {
LOG.debug("Creating file=" + path + " with permission=" + perm);
@@ -761,6 +762,7 @@ public abstract class FSUtils {
* @return true if exists
* @throws IOException e
*/
+ @SuppressWarnings("deprecation")
public static boolean metaRegionExists(FileSystem fs, Path rootdir)
throws IOException {
Path rootRegionDir =
@@ -1113,7 +1115,7 @@ public abstract class FSUtils {
* @throws IOException
*/
public abstract void recoverFileLease(final FileSystem fs, final Path p,
- Configuration conf) throws IOException;
+ Configuration conf, CancelableProgressable reporter) throws IOException;
/**
* @param fs
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java?rev=1468981&r1=1468980&r2=1468981&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java Wed Apr 17 15:53:27 2013
@@ -753,7 +753,7 @@ public class RegionSplitter {
} else {
LOG.debug("_balancedSplit file found. Replay log to restore state...");
FSUtils.getInstance(fs, table.getConfiguration())
- .recoverFileLease(fs, splitFile, table.getConfiguration());
+ .recoverFileLease(fs, splitFile, table.getConfiguration(), null);
// parse split file and process remaining splits
FSDataInputStream tmpIn = fs.open(splitFile);
Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java?rev=1468981&r1=1468980&r2=1468981&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java Wed Apr 17 15:53:27 2013
@@ -159,6 +159,7 @@ public class TestDistributedLogSplitting
for (HRegionInfo hri : regions) {
Path tdir = HTableDescriptor.getTableDir(rootdir, table);
+ @SuppressWarnings("deprecation")
Path editsdir =
HLogUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName()));
LOG.debug("checking edits dir " + editsdir);
Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java?rev=1468981&r1=1468980&r2=1468981&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java Wed Apr 17 15:53:27 2013
@@ -459,7 +459,7 @@ public class TestHLog {
public void run() {
try {
FSUtils.getInstance(fs, rlConf)
- .recoverFileLease(recoveredFs, walPath, rlConf);
+ .recoverFileLease(recoveredFs, walPath, rlConf, null);
} catch (IOException e) {
exception = e;
}
Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java?rev=1468981&r1=1468980&r2=1468981&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java Wed Apr 17 15:53:27 2013
@@ -36,6 +36,7 @@ import java.util.Map;
import java.util.NavigableSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
@@ -64,6 +65,7 @@ import org.apache.hadoop.hbase.LargeTest
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
+import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.CorruptedLogFileException;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
@@ -918,6 +920,45 @@ public class TestHLogSplit {
}
}
+ @Test
+ public void testTerminationAskedByReporter() throws IOException, CorruptedLogFileException {
+ generateHLogs(1, 10, -1);
+ FileStatus logfile = fs.listStatus(HLOGDIR)[0];
+ fs.initialize(fs.getUri(), conf);
+
+ final AtomicInteger count = new AtomicInteger();
+
+ CancelableProgressable localReporter
+ = new CancelableProgressable() {
+ @Override
+ public boolean progress() {
+ count.getAndIncrement();
+ return false;
+ }
+ };
+
+ FileSystem spiedFs = Mockito.spy(fs);
+ Mockito.doAnswer(new Answer<FSDataInputStream>() {
+ public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
+ Thread.sleep(1500); // Sleep a while and wait report status invoked
+ return (FSDataInputStream)invocation.callRealMethod();
+ }
+ }).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt());
+
+ try {
+ conf.setInt("hbase.splitlog.report.period", 1000);
+ HLogSplitter s = new HLogSplitter(conf, HBASEDIR, null, null, spiedFs, null);
+ boolean ret = s.splitLogFile(logfile, localReporter);
+ assertFalse("Log splitting should failed", ret);
+ assertTrue(count.get() > 0);
+ } catch (IOException e) {
+ fail("There shouldn't be any exception but: " + e.toString());
+ } finally {
+ // reset it back to its default value
+ conf.setInt("hbase.splitlog.report.period", 59000);
+ }
+ }
+
/**
* Test log split process with fake data and lots of edits to trigger threading
* issues.
@@ -1000,8 +1041,8 @@ public class TestHLogSplit {
/* Produce a mock reader that generates fake entries */
- protected Reader getReader(FileSystem fs, Path curLogFile, Configuration conf)
- throws IOException {
+ protected Reader getReader(FileSystem fs, Path curLogFile,
+ Configuration conf, CancelableProgressable reporter) throws IOException {
Reader mockReader = Mockito.mock(Reader.class);
Mockito.doAnswer(new Answer<HLog.Entry>() {
int index = 0;