You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2013/04/21 02:27:35 UTC
svn commit: r1470253 - in /hbase/branches/0.94/src:
main/java/org/apache/hadoop/hbase/regionserver/wal/
main/java/org/apache/hadoop/hbase/util/
test/java/org/apache/hadoop/hbase/regionserver/wal/
Author: tedyu
Date: Sun Apr 21 00:27:35 2013
New Revision: 1470253
URL: http://svn.apache.org/r1470253
Log:
HBASE-8354 Backport HBASE-7878 'recoverFileLease does not check return value of recoverLease' to 0.94 (Liang Xie)
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1470253&r1=1470252&r2=1470253&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java Sun Apr 21 00:27:35 2013
@@ -34,6 +34,7 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -177,6 +178,20 @@ public class HLogSplitter {
*/
public List<Path> splitLog()
throws IOException {
+ return splitLog((CountDownLatch) null);
+ }
+
+ /**
+ * Split up a bunch of regionserver commit log files that are no longer being
+ * written to, into new files, one per region for region to replay on startup.
+ * Delete the old log files when finished.
+ *
+ * @param latch
+ * @throws IOException will throw if corrupted hlogs aren't tolerated
+ * @return the list of splits
+ */
+ public List<Path> splitLog(CountDownLatch latch)
+ throws IOException {
Preconditions.checkState(!hasSplit,
"An HLogSplitter instance may only be used once");
hasSplit = true;
@@ -200,7 +215,7 @@ public class HLogSplitter {
}
logAndReport("Splitting " + logfiles.length + " hlog(s) in "
+ srcDir.toString());
- splits = splitLog(logfiles);
+ splits = splitLog(logfiles, latch);
splitTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
String msg = "hlog file splitting completed in " + splitTime +
@@ -260,7 +275,8 @@ public class HLogSplitter {
* After the process is complete, the log files are archived to a separate
* directory.
*/
- private List<Path> splitLog(final FileStatus[] logfiles) throws IOException {
+ private List<Path> splitLog(final FileStatus[] logfiles, CountDownLatch latch)
+ throws IOException {
List<Path> processedLogs = new ArrayList<Path>();
List<Path> corruptedLogs = new ArrayList<Path>();
List<Path> splits = null;
@@ -307,10 +323,19 @@ public class HLogSplitter {
}
status.setStatus("Log splits complete. Checking for orphaned logs.");
- if (fs.listStatus(srcDir).length > processedLogs.size()
+ if (latch != null) {
+ try {
+ latch.await();
+ } catch (InterruptedException ie) {
+ LOG.warn("wait for latch interrupted");
+ Thread.currentThread().interrupt();
+ }
+ }
+ FileStatus[] currFiles = fs.listStatus(srcDir);
+ if (currFiles.length > processedLogs.size()
+ corruptedLogs.size()) {
throw new OrphanHLogAfterSplitException(
- "Discovered orphan hlog after split. Maybe the "
+ "Discovered orphan hlog after split. Maybe the "
+ "HRegionServer was not dead when we started");
}
} finally {
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java?rev=1470253&r1=1470252&r2=1470253&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java Sun Apr 21 00:27:35 2013
@@ -51,6 +51,8 @@ public class FSHDFSUtils extends FSUtils
*/
public static final long LEASE_SOFTLIMIT_PERIOD = 60 * 1000;
+ public static final String TEST_TRIGGER_DFS_APPEND = "hbase.test.trigger.dfs.append";
+
@Override
public void recoverFileLease(final FileSystem fs, final Path p, Configuration conf)
throws IOException{
@@ -68,26 +70,37 @@ public class FSHDFSUtils extends FSUtils
// Trying recovery
boolean recovered = false;
+ long recoveryTimeout = conf.getInt("hbase.lease.recovery.timeout", 300000);
+ // conf parameter passed from unit test, indicating whether fs.append() should be triggered
+ boolean triggerAppend = conf.getBoolean(TEST_TRIGGER_DFS_APPEND, false);
+ Exception ex = null;
while (!recovered) {
try {
try {
- if (fs instanceof DistributedFileSystem) {
- DistributedFileSystem dfs = (DistributedFileSystem)fs;
- DistributedFileSystem.class.getMethod("recoverLease",
- new Class[] {Path.class}).invoke(dfs, p);
- } else {
- throw new Exception("Not a DistributedFileSystem");
+ DistributedFileSystem dfs = (DistributedFileSystem) fs;
+ if (triggerAppend) throw new IOException();
+ try {
+ recovered = (Boolean) DistributedFileSystem.class.getMethod(
+ "recoverLease", new Class[] { Path.class }).invoke(dfs, p);
+ if (!recovered) LOG.debug("recoverLease returned false");
+ } catch (InvocationTargetException ite) {
+ // function was properly called, but threw it's own exception
+ throw (IOException) ite.getCause();
}
- } catch (InvocationTargetException ite) {
- // function was properly called, but threw it's own exception
- throw (IOException) ite.getCause();
} catch (Exception e) {
LOG.debug("Failed fs.recoverLease invocation, " + e.toString() +
", trying fs.append instead");
+ ex = e;
+ }
+ if (ex != null || System.currentTimeMillis() - startWaiting > recoveryTimeout) {
+ LOG.debug("trying fs.append for " + p + " with " + ex);
+ ex = null; // assume the following append() call would succeed
FSDataOutputStream out = fs.append(p);
out.close();
+ recovered = true;
+ LOG.debug("fs.append passed");
}
- recovered = true;
+ if (recovered) break;
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
if (e instanceof AlreadyBeingCreatedException) {
@@ -111,9 +124,9 @@ public class FSHDFSUtils extends FSUtils
}
try {
Thread.sleep(1000);
- } catch (InterruptedException ex) {
+ } catch (InterruptedException ie) {
InterruptedIOException iioe = new InterruptedIOException();
- iioe.initCause(ex);
+ iioe.initCause(ie);
throw iioe;
}
}
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java?rev=1470253&r1=1470252&r2=1470253&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java Sun Apr 21 00:27:35 2013
@@ -42,6 +42,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSHDFSUtils;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
@@ -101,6 +102,7 @@ public class TestHLog {
// Make block sizes small.
TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
// needed for testAppendClose()
+ TEST_UTIL.getConfiguration().setBoolean("dfs.support.broken.append", true);
TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
// quicker heartbeat interval for faster DN death notification
TEST_UTIL.getConfiguration().setInt("heartbeat.recheck.interval", 5000);
@@ -370,17 +372,29 @@ public class TestHLog {
}
}
- // For this test to pass, requires:
- // 1. HDFS-200 (append support)
- // 2. HDFS-988 (SafeMode should freeze file operations
- // [FSNamesystem.nextGenerationStampForBlock])
- // 3. HDFS-142 (on restart, maintain pendingCreates)
+ /*
+ * We pass different values to recoverFileLease() so that different code paths are covered
+ *
+ * For this test to pass, requires:
+ * 1. HDFS-200 (append support)
+ * 2. HDFS-988 (SafeMode should freeze file operations
+ * [FSNamesystem.nextGenerationStampForBlock])
+ * 3. HDFS-142 (on restart, maintain pendingCreates)
+ */
@Test
public void testAppendClose() throws Exception {
+ testAppendClose(true);
+ testAppendClose(false);
+ }
+
+ /*
+ * @param triggerDirectAppend whether to trigger direct call of fs.append()
+ */
+ public void testAppendClose(final boolean triggerDirectAppend) throws Exception {
byte [] tableName = Bytes.toBytes(getName());
HRegionInfo regioninfo = new HRegionInfo(tableName,
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false);
- Path subdir = new Path(dir, "hlogdir");
+ Path subdir = new Path(dir, "hlogdir" + triggerDirectAppend);
Path archdir = new Path(dir, "hlogdir_archive");
HLog wal = new HLog(fs, subdir, archdir, conf);
final int total = 20;
@@ -454,6 +468,7 @@ public class TestHLog {
public Exception exception = null;
public void run() {
try {
+ rlConf.setBoolean(FSHDFSUtils.TEST_TRIGGER_DFS_APPEND, triggerDirectAppend);
FSUtils.getInstance(fs, rlConf)
.recoverFileLease(recoveredFs, walPath, rlConf);
} catch (IOException e) {
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java?rev=1470253&r1=1470252&r2=1470253&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java Sun Apr 21 00:27:35 2013
@@ -33,6 +33,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -119,12 +120,11 @@ public class TestHLogSplit {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
- TEST_UTIL.getConfiguration().
- setStrings("hbase.rootdir", hbaseDir.toString());
- TEST_UTIL.getConfiguration().
- setClass("hbase.regionserver.hlog.writer.impl",
- InstrumentedSequenceFileLogWriter.class, HLog.Writer.class);
-
+ TEST_UTIL.getConfiguration().setStrings("hbase.rootdir", hbaseDir.toString());
+ TEST_UTIL.getConfiguration().setClass("hbase.regionserver.hlog.writer.impl",
+ InstrumentedSequenceFileLogWriter.class, HLog.Writer.class);
+ TEST_UTIL.getConfiguration().setBoolean("dfs.support.broken.append", true);
+ TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
TEST_UTIL.startMiniDFSCluster(2);
}
@@ -189,11 +189,12 @@ public class TestHLogSplit {
generateHLogs(-1);
+ CountDownLatch latch = new CountDownLatch(1);
try {
- (new ZombieNewLogWriterRegionServer(stop)).start();
- HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
- hbaseDir, hlogDir, oldLogDir, fs);
- logSplitter.splitLog();
+ (new ZombieNewLogWriterRegionServer(latch, stop)).start();
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, hbaseDir, hlogDir, oldLogDir,
+ fs);
+ logSplitter.splitLog(latch);
} finally {
stop.set(true);
}
@@ -553,16 +554,23 @@ public class TestHLogSplit {
AtomicBoolean stop = new AtomicBoolean(false);
generateHLogs(-1);
fs.initialize(fs.getUri(), conf);
- Thread zombie = new ZombieNewLogWriterRegionServer(stop);
+ CountDownLatch latch = new CountDownLatch(1);
+ Thread zombie = new ZombieNewLogWriterRegionServer(latch, stop);
+ List<Path> splits = null;
try {
zombie.start();
try {
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
hbaseDir, hlogDir, oldLogDir, fs);
- logSplitter.splitLog();
- } catch (IOException ex) {/* expected */}
- int logFilesNumber = fs.listStatus(hlogDir).length;
+ splits = logSplitter.splitLog(latch);
+ } catch (IOException ex) {
+ /* expected */
+ LOG.warn("testSplitWillNotTouchLogsIfNewHLogGetsCreatedAfterSplitStarted", ex);
+ }
+ FileStatus[] files = fs.listStatus(hlogDir);
+ if (files == null) fail("no files in " + hlogDir + " with splits " + splits);
+ int logFilesNumber = files.length;
assertEquals("Log files should not be archived if there's an extra file after split",
NUM_WRITERS + 1, logFilesNumber);
@@ -959,8 +967,10 @@ public class TestHLogSplit {
*/
class ZombieNewLogWriterRegionServer extends Thread {
AtomicBoolean stop;
- public ZombieNewLogWriterRegionServer(AtomicBoolean stop) {
+ CountDownLatch latch;
+ public ZombieNewLogWriterRegionServer(CountDownLatch latch, AtomicBoolean stop) {
super("ZombieNewLogWriterRegionServer");
+ this.latch = latch;
this.stop = stop;
}
@@ -977,7 +987,7 @@ public class TestHLogSplit {
try {
while (!fs.exists(recoveredEdits) && !stop.get()) {
- flushToConsole("Juliet: split not started, sleeping a bit...");
+ LOG.info("Juliet: split not started, sleeping a bit...");
Threads.sleep(10);
}
@@ -987,8 +997,10 @@ public class TestHLogSplit {
appendEntry(writer, "juliet".getBytes(), ("juliet").getBytes(),
("r").getBytes(), FAMILY, QUALIFIER, VALUE, 0);
writer.close();
- flushToConsole("Juliet file creator: created file " + julietLog);
+ LOG.info("Juliet file creator: created file " + julietLog);
+ latch.countDown();
} catch (IOException e1) {
+ LOG.error("Failed to create file " + julietLog, e1);
assertTrue("Failed to create file " + julietLog, false);
}
}
@@ -1192,7 +1204,7 @@ public class TestHLogSplit {
}
if (i != leaveOpen) {
writer[i].close();
- flushToConsole("Closing writer " + i);
+ LOG.info("Closing writer " + i);
}
}
}
@@ -1305,8 +1317,9 @@ public class TestHLogSplit {
byte[] row, byte[] family, byte[] qualifier,
byte[] value, long seq)
throws IOException {
-
+ LOG.info(Thread.currentThread().getName() + " append");
writer.append(createTestEntry(table, region, row, family, qualifier, value, seq));
+ LOG.info(Thread.currentThread().getName() + " sync");
writer.sync();
return seq;
}