You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2009/10/28 04:58:03 UTC
svn commit: r830434 - in /hadoop/hbase/trunk: CHANGES.txt
src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
src/java/org/apache/hadoop/hbase/regionserver/LogFlusher.java
src/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
Author: jdcryans
Date: Wed Oct 28 03:58:03 2009
New Revision: 830434
URL: http://svn.apache.org/viewvc?rev=830434&view=rev
Log:
HBASE-1936 HLog group commit
Removed:
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/LogFlusher.java
Modified:
hadoop/hbase/trunk/CHANGES.txt
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=830434&r1=830433&r2=830434&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Wed Oct 28 03:58:03 2009
@@ -142,6 +142,7 @@
HBASE-1918 Don't do DNS resolving in .META. scanner for each row
HBASE-1756 Refactor HLog (changing package first)
HBASE-1926 Remove unused xmlenc jar from trunk
+ HBASE-1936 HLog group commit
OPTIMIZATIONS
HBASE-410 [testing] Speed up the test suite
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=830434&r1=830433&r2=830434&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed Oct 28 03:58:03 2009
@@ -91,8 +91,6 @@
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.LogFlusher;
-import org.apache.hadoop.hbase.regionserver.LogRoller;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.InfoServer;
@@ -207,7 +205,6 @@
// eclipse warning when accessed by inner classes
protected volatile HLog hlog;
LogRoller hlogRoller;
- LogFlusher hlogFlusher;
// limit compactions while starting up
CompactionLimitThread compactionLimitThread;
@@ -329,10 +326,6 @@
// Log rolling thread
this.hlogRoller = new LogRoller(this);
- // Log flushing thread
- this.hlogFlusher =
- new LogFlusher(this.threadWakeFrequency, this.stopRequested);
-
// Background thread to check for major compactions; needed if region
// has not gotten updates in a while. Make it run at a lesser frequency.
int multiplier = this.conf.getInt(THREAD_WAKE_FREQUENCY +
@@ -518,7 +511,6 @@
try {
serverInfo.setStartCode(System.currentTimeMillis());
hlog = setupHLog();
- this.hlogFlusher.setHLog(hlog);
} catch (IOException e) {
this.abortRequested = true;
this.stopRequested.set(true);
@@ -616,7 +608,6 @@
// Send interrupts to wake up threads if sleeping so they notice shutdown.
// TODO: Should we check they are alive? If OOME could have exited already
cacheFlusher.interruptIfNecessary();
- hlogFlusher.interrupt();
compactSplitThread.interruptIfNecessary();
hlogRoller.interruptIfNecessary();
this.majorCompactionChecker.interrupt();
@@ -746,7 +737,6 @@
this.rootDir = new Path(this.conf.get(HConstants.HBASE_DIR));
this.hlog = setupHLog();
- this.hlogFlusher.setHLog(hlog);
// Init in here rather than in constructor after thread name has been set
this.metrics = new RegionServerMetrics();
startServiceThreads();
@@ -1131,8 +1121,6 @@
};
Threads.setDaemonThreadRunning(this.hlogRoller, n + ".logRoller",
handler);
- Threads.setDaemonThreadRunning(this.hlogFlusher, n + ".logFlusher",
- handler);
Threads.setDaemonThreadRunning(this.cacheFlusher, n + ".cacheFlusher",
handler);
Threads.setDaemonThreadRunning(this.compactSplitThread, n + ".compactor",
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=830434&r1=830433&r2=830434&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Wed Oct 28 03:58:03 2009
@@ -24,7 +24,6 @@
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Field;
-import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
@@ -34,14 +33,16 @@
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -59,15 +60,12 @@
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
-import org.apache.hadoop.hbase.regionserver.wal.LogRollListener;
-import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
-import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.SequenceFile.Metadata;
import org.apache.hadoop.io.SequenceFile.Reader;
@@ -126,11 +124,10 @@
private final long blocksize;
private final int flushlogentries;
private final AtomicInteger unflushedEntries = new AtomicInteger(0);
- private volatile long lastLogFlushTime;
- private final boolean append;
- private final Method syncfs;
private final short replicationLevel;
- private final static Object [] NO_ARGS = new Object []{};
+
+ // used to indirectly tell syncFs to force the sync
+ private final AtomicBoolean forceSync = new AtomicBoolean(false);
/*
* Current log file.
@@ -183,6 +180,11 @@
*/
private final int maxLogs;
+ /**
+ * Thread that handles group commit
+ */
+ private final LogSyncer logSyncerThread;
+
static byte [] COMPLETE_CACHE_FLUSH;
static {
try {
@@ -224,7 +226,6 @@
this.logrollsize = (long)(this.blocksize * multi);
this.optionalFlushInterval =
conf.getLong("hbase.regionserver.optionallogflushinterval", 10 * 1000);
- this.lastLogFlushTime = System.currentTimeMillis();
if (fs.exists(dir)) {
throw new IOException("Target HLog directory already exists: " + dir);
}
@@ -237,21 +238,9 @@
", flushlogentries=" + this.flushlogentries +
", optionallogflushinternal=" + this.optionalFlushInterval + "ms");
rollWriter();
- // Test if syncfs is available.
- this.append = isAppend(conf);
- Method m = null;
- if (this.append) {
- try {
- m = this.writer.getClass().getMethod("syncFs", new Class<?> []{});
- LOG.debug("Using syncFs--hadoop-4379");
- } catch (SecurityException e) {
- throw new IOException("Failed test for syncfs", e);
- } catch (NoSuchMethodException e) {
- // This can happen
- LOG.info("syncFs--hadoop-4379 not available" );
- }
- }
- this.syncfs = m;
+ logSyncerThread = new LogSyncer(this.flushlogentries);
+ Threads.setDaemonThreadRunning(logSyncerThread,
+ Thread.currentThread().getName() + ".logSyncer");
}
/**
@@ -591,6 +580,14 @@
* @throws IOException
*/
public void close() throws IOException {
+ try {
+ logSyncerThread.interrupt();
+ // Make sure we synced everything
+ logSyncerThread.join();
+ } catch (InterruptedException e) {
+ LOG.error("Exception while waiting for syncer thread to die", e);
+ }
+
cacheFlushLock.lock();
try {
synchronized (updateLock) {
@@ -718,39 +715,118 @@
}
}
+ /**
+ * This thread is responsible to call syncFs and buffer up the writers while
+ * it happens.
+ */
+ class LogSyncer extends Thread {
+
+ // Using fairness to make sure locks are given in order
+ private final ReentrantLock lock = new ReentrantLock(true);
+
+ // Condition used to wait until we have something to sync
+ private final Condition queueEmpty = lock.newCondition();
+
+ // Condition used to signal that the sync is done
+ private final Condition syncDone = lock.newCondition();
+
+ private final int optionalFlushInterval;
+
+ LogSyncer(int optionalFlushInterval) {
+ this.optionalFlushInterval = optionalFlushInterval;
+ }
+
+ public void run() {
+ try {
+ lock.lock();
+ while(!closed) {
+
+ // Wait until something has to be synced or do it if we waited enough
+ // time (useful if something appends but does not sync).
+ queueEmpty.await(this.optionalFlushInterval, TimeUnit.MILLISECONDS);
+
+ // We got the signal, let's syncFS. We currently own the lock so new
+ // writes are waiting to acquire it in addToSyncQueue while the ones
+ // we sync are waiting on await()
+ hflush();
+
+ // Release all the clients waiting on the sync. Notice that we still
+ // own the lock until we get back to await at which point all the
+ // other threads waiting will first acquire and release locks
+ syncDone.signalAll();
+ }
+ } catch (IOException e) {
+ LOG.error("Error while syncing, requesting close of hlog ", e);
+ requestLogRoll();
+ } catch (InterruptedException e) {
+ LOG.debug(getName() + "interrupted while waiting for sync requests",e );
+ } finally {
+ lock.unlock();
+ LOG.info(getName() + " exiting");
+ }
+ }
+
+ /**
+ * This method first signals the thread that there's a sync needed
+ * and then waits for it to happen before returning.
+ */
+ public void addToSyncQueue() {
+
+ // Don't bother if somehow our append was already synced
+ if (unflushedEntries.get() == 0) {
+ return;
+ }
+ lock.lock();
+ try {
+ // Wake the thread
+ queueEmpty.signal();
+
+ // Wait for it to syncFs
+ syncDone.await();
+ } catch (InterruptedException e) {
+ LOG.debug(getName() + " was interrupted while waiting for sync", e);
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+ }
+
public void sync() throws IOException {
sync(false);
}
/**
+ * This method calls the LogSyncer in order to group commit the sync
+ * with other threads.
+ * @param force For catalog regions, force the sync to happen
+ * @throws IOException
+ */
+ public void sync(boolean force) throws IOException {
+ // Set force sync if force is true and forceSync is false
+ forceSync.compareAndSet(!forceSync.get() && force, true);
+ logSyncerThread.addToSyncQueue();
+ }
+
+ /**
* Multiple threads will call sync() at the same time, only the winner
* will actually flush if there is any race or build up.
*
- * @param force sync regardless (for meta updates) if there is data
* @throws IOException
*/
- public void sync(boolean force) throws IOException {
+ protected void hflush() throws IOException {
synchronized (this.updateLock) {
if (this.closed)
return;
- if (this.unflushedEntries.get() == 0)
- return; // win
-
- if (force || this.unflushedEntries.get() > this.flushlogentries) {
+ if (this.forceSync.get() ||
+ this.unflushedEntries.get() > this.flushlogentries) {
try {
- lastLogFlushTime = System.currentTimeMillis();
- if (this.append && syncfs != null) {
- try {
- this.syncfs.invoke(this.writer, NO_ARGS);
- } catch (Exception e) {
- throw new IOException("Reflection", e);
- }
- } else {
- this.writer.sync();
- if (this.writer_out != null)
- this.writer_out.sync();
+ this.writer.sync();
+ if (this.writer_out != null) {
+ this.writer_out.sync();
}
+ this.forceSync.compareAndSet(true, false);
this.unflushedEntries.set(0);
} catch (IOException e) {
LOG.fatal("Could not append. Requesting close of hlog", e);
@@ -986,7 +1062,6 @@
int concurrentLogReads =
conf.getInt("hbase.regionserver.hlog.splitlog.reader.threads", 3);
// Is append supported?
- boolean append = isAppend(conf);
try {
int maxSteps = Double.valueOf(Math.ceil((logfiles.length * 1.0) /
concurrentLogReads)).intValue();
@@ -1005,7 +1080,6 @@
LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length +
": " + logfiles[i].getPath() + ", length=" + logfiles[i].getLen());
}
- recoverLog(fs, logfiles[i].getPath(), append);
SequenceFile.Reader in = null;
int count = 0;
try {
@@ -1160,24 +1234,6 @@
}
/**
- * @param conf
- * @return True if append enabled and we have the syncFs in our path.
- */
- private static boolean isAppend(final HBaseConfiguration conf) {
- boolean append = conf.getBoolean("dfs.support.append", false);
- if (append) {
- try {
- SequenceFile.Writer.class.getMethod("syncFs", new Class<?> []{});
- append = true;
- } catch (SecurityException e) {
- } catch (NoSuchMethodException e) {
- append = false;
- }
- }
- return append;
- }
-
- /**
* Utility class that lets us keep track of the edit with it's key
* Only used when splitting logs
*/
@@ -1224,40 +1280,6 @@
return getHLogDirectoryName(HServerInfo.getServerName(info));
}
- /*
- * Recover log.
- * If append has been set, try and open log in append mode.
- * Doing this, we get a hold of the file that crashed writer
- * was writing to. Once we have it, close it. This will
- * allow subsequent reader to see up to last sync.
- * @param fs
- * @param p
- * @param append
- */
- private static void recoverLog(final FileSystem fs, final Path p,
- final boolean append) {
- if (!append) {
- return;
- }
- // Trying recovery
- boolean recovered = false;
- while (!recovered) {
- try {
- FSDataOutputStream out = fs.append(p);
- out.close();
- recovered = true;
- } catch (IOException e) {
- LOG.info("Failed open for append, waiting on lease recovery: " + p, e);
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ex) {
- // ignore it and try again
- }
- }
- }
- LOG.info("Past out lease recovery");
- }
-
/**
* Construct the HLog directory name
*