You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2009/07/11 01:18:16 UTC
svn commit: r793145 - in /hadoop/hbase/trunk: CHANGES.txt
src/java/org/apache/hadoop/hbase/regionserver/HLog.java
Author: stack
Date: Fri Jul 10 23:18:15 2009
New Revision: 793145
URL: http://svn.apache.org/viewvc?rev=793145&view=rev
Log:
HBASE-1470 hbase and HADOOP-4379, dhruba's flush/sync
Modified:
hadoop/hbase/trunk/CHANGES.txt
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=793145&r1=793144&r2=793145&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Fri Jul 10 23:18:15 2009
@@ -468,6 +468,7 @@
HBASE-1643 ScanDeleteTracker takes comparator but it unused
HBASE-1603 MR failed "RetriesExhaustedException: Trying to contact region server
Some server for region TestTable..." -- deubugging
+ HBASE-1470 hbase and HADOOP-4379, dhruba's flush/sync
OPTIMIZATIONS
HBASE-1412 Change values for delete column and column family in KeyValue
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java?rev=793145&r1=793144&r2=793145&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java Fri Jul 10 23:18:15 2009
@@ -23,6 +23,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
@@ -63,6 +64,7 @@
import org.apache.hadoop.io.SequenceFile.Metadata;
import org.apache.hadoop.io.SequenceFile.Reader;
import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.fs.FSDataOutputStream;
/**
* HLog stores all the edits to the HStore.
@@ -112,7 +114,10 @@
private final int flushlogentries;
private final AtomicInteger unflushedEntries = new AtomicInteger(0);
private volatile long lastLogFlushTime;
-
+ private final boolean append;
+ private final Method syncfs;
+ private final static Object [] NO_ARGS = new Object []{};
+
/*
* Current log file.
*/
@@ -213,6 +218,21 @@
", flushlogentries=" + this.flushlogentries +
", optionallogflushinternal=" + this.optionalFlushInterval + "ms");
rollWriter();
+ // Test if syncfs is available.
+ this.append = conf.getBoolean("dfs.support.append", false);
+ Method m = null;
+ if (this.append) {
+ try {
+ m = this.writer.getClass().getMethod("syncFs", new Class<?> []{});
+ LOG.debug("Using syncFs--hadoop-4379");
+ } catch (SecurityException e) {
+ throw new IOException("Failed test for syncfs", e);
+ } catch (NoSuchMethodException e) {
+ // This can happen
+ LOG.info("syncFs--hadoop-4379 not available" );
+ }
+ }
+ this.syncfs = m;
}
/**
@@ -585,7 +605,15 @@
public void sync() throws IOException {
lastLogFlushTime = System.currentTimeMillis();
- this.writer.sync();
+ if (this.append && syncfs != null) {
+ try {
+ this.syncfs.invoke(this.writer, NO_ARGS);
+ } catch (Exception e) {
+ throw new IOException("Reflection", e);
+ }
+ } else {
+ this.writer.sync();
+ }
this.unflushedEntries.set(0);
}
@@ -821,15 +849,16 @@
step * DEFAULT_NUMBER_CONCURRENT_LOG_READS +
DEFAULT_NUMBER_CONCURRENT_LOG_READS;
for (int i = (step * 10); i < endIndex; i++) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length +
- ": " + logfiles[i].getPath() +
- ", length=" + logfiles[i].getLen());
- }
// Check for possibly empty file. With appends, currently Hadoop
// reports a zero length even if the file has been sync'd. Revisit if
// HADOOP-4751 is committed.
long length = logfiles[i].getLen();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length +
+ ": " + logfiles[i].getPath() + ", length=" + logfiles[i].getLen());
+ }
+ boolean append = conf.getBoolean("dfs.support.append", false);
+ recoverLog(fs, logfiles[i].getPath(), append);
SequenceFile.Reader in = null;
int count = 0;
try {
@@ -853,10 +882,10 @@
key = new HLogKey();
val = new KeyValue();
}
- LOG.debug("Pushed " + count + " entries from " +
+ LOG.debug("Pushed=" + count + " entries from " +
logfiles[i].getPath());
} catch (IOException e) {
- LOG.debug("IOE Pushed " + count + " entries from " +
+ LOG.debug("IOE Pushed=" + count + " entries from " +
logfiles[i].getPath());
e = RemoteExceptionHandler.checkIOException(e);
if (!(e instanceof EOFException)) {
@@ -866,7 +895,7 @@
}
} catch (IOException e) {
if (length <= 0) {
- LOG.warn("Empty hlog, continuing: " + logfiles[i]);
+ LOG.warn("Empty hlog, continuing: " + logfiles[i] + " count=" + count, e);
continue;
}
throw e;
@@ -943,9 +972,6 @@
fs.delete(oldlogfile, true);
}
}
- if (wap == null) {
- throw new NullPointerException();
- }
wap.w.append(logEntry.getKey(), logEntry.getEdit());
count++;
}
@@ -1030,6 +1056,40 @@
public static String getHLogDirectoryName(HServerInfo info) {
return getHLogDirectoryName(HServerInfo.getServerName(info));
}
+
+ /*
+ * Recover log.
+ * If append has been set, try and open log in append mode.
+ * Doing this, we get a hold of the file that crashed writer
+ * was writing to. Once we have it, close it. This will
+ * allow subsequent reader to see up to last sync.
+ * @param fs
+ * @param p
+ * @param append
+ */
+ private static void recoverLog(final FileSystem fs, final Path p,
+ final boolean append) {
+ if (!append) {
+ return;
+ }
+ // Trying recovery
+ boolean recovered = false;
+ while (!recovered) {
+ try {
+ FSDataOutputStream out = fs.append(p);
+ out.close();
+ recovered = true;
+ } catch (IOException e) {
+ LOG.info("Failed open for append, waiting on lease recovery: " + p, e);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ // ignore it and try again
+ }
+ }
+ }
+ LOG.info("Past out lease recovery");
+ }
/**
* Construct the HLog directory name