You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2009/05/15 15:45:20 UTC
svn commit: r775133 - in /hadoop/hbase/branches/0.19: CHANGES.txt
src/java/org/apache/hadoop/hbase/HConstants.java
src/java/org/apache/hadoop/hbase/regionserver/HLog.java
Author: jdcryans
Date: Fri May 15 13:45:20 2009
New Revision: 775133
URL: http://svn.apache.org/viewvc?rev=775133&view=rev
Log:
HBASE-1008 [performance] The replay of logs on server crash takes way too long
Modified:
hadoop/hbase/branches/0.19/CHANGES.txt
hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/HConstants.java
hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
Modified: hadoop/hbase/branches/0.19/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/CHANGES.txt?rev=775133&r1=775132&r2=775133&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/CHANGES.txt (original)
+++ hadoop/hbase/branches/0.19/CHANGES.txt Fri May 15 13:45:20 2009
@@ -1,5 +1,5 @@
HBase Change Log
-Release 0.19.2 - Unreleased
+Release 0.19.3 - Unreleased
BUG FIXES
HBASE-1413 fall back to filesystem block size default if
hbase.regionserver.hlog.blocksize is not specified
@@ -12,6 +12,7 @@
HBASE-1418 Transacitonal improvments and fixes (Clint Morgan via Stack)
HBASE-1424 have shell print regioninfo and location on first load if
DEBUG enabled
+ HBASE-1008 [performance] The replay of logs on server crash takes way too long
Release 0.19.2 - May 9th, 2009
BUG FIXES
Modified: hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/HConstants.java?rev=775133&r1=775132&r2=775133&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/HConstants.java Fri May 15 13:45:20 2009
@@ -120,6 +120,10 @@
/** Default size of a reservation block */
static final int DEFAULT_SIZE_RESERVATION_BLOCK = 1024 * 1024 * 5;
+ /** Default number of threads to use when log splitting
+ * to rewrite the logs. More means faster but bigger mem consumption */
+ static final int DEFAULT_NUMBER_LOG_WRITER_THREAD = 10;
+
// Always store the location of the root table's HRegion.
// This HRegion is never split.
Modified: hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HLog.java?rev=775133&r1=775132&r2=775133&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HLog.java (original)
+++ hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HLog.java Fri May 15 13:45:20 2009
@@ -25,10 +25,14 @@
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.Collections;
+import java.util.LinkedList;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -718,6 +722,7 @@
public static void splitLog(final Path rootDir, final Path srcDir,
final FileSystem fs, final Configuration conf)
throws IOException {
+ long millis = System.currentTimeMillis();
if (!fs.exists(srcDir)) {
// Nothing to do
return;
@@ -738,7 +743,9 @@
io.initCause(e);
throw io;
}
- LOG.info("log file splitting completed for " + srcDir.toString());
+ long endMillis = System.currentTimeMillis();
+ LOG.info("log file splitting completed in " + (endMillis - millis) +
+ " millis for " + srcDir.toString());
}
/*
@@ -749,101 +756,150 @@
* @throws IOException
*/
private static void splitLog(final Path rootDir, final FileStatus [] logfiles,
- final FileSystem fs, final Configuration conf)
+ final FileSystem fs, final Configuration conf)
throws IOException {
- Map<byte [], SequenceFile.Writer> logWriters =
+ final Map<byte [], SequenceFile.Writer> logWriters =
new TreeMap<byte [], SequenceFile.Writer>(Bytes.BYTES_COMPARATOR);
+ final Map<byte[], LinkedList<HLogEntry>> logEntries =
+ new TreeMap<byte[], LinkedList<HLogEntry>>(Bytes.BYTES_COMPARATOR);
try {
for (int i = 0; i < logfiles.length; i++) {
if (LOG.isDebugEnabled()) {
LOG.debug("Splitting " + (i + 1) + " of " + logfiles.length + ": " +
- logfiles[i].getPath() + ", length=" + logfiles[i].getLen());
+ logfiles[i].getPath() + ", length=" + logfiles[i].getLen());
}
// Check for possibly empty file. With appends, currently Hadoop reports
// a zero length even if the file has been sync'd. Revisit if
// HADOOP-4751 is committed.
- boolean possiblyEmpty = logfiles[i].getLen() <= 0;
+ long length = logfiles[i].getLen();
HLogKey key = new HLogKey();
HLogEdit val = new HLogEdit();
+ SequenceFile.Reader in = null;
try {
- SequenceFile.Reader in =
- new SequenceFile.Reader(fs, logfiles[i].getPath(), conf);
+ in = new SequenceFile.Reader(fs, logfiles[i].getPath(), conf);
try {
int count = 0;
- for (; in.next(key, val); count++) {
- byte [] tableName = key.getTablename();
- byte [] regionName = key.getRegionName();
- SequenceFile.Writer w = logWriters.get(regionName);
- if (w == null) {
- Path logfile = new Path(
- HRegion.getRegionDir(
- HTableDescriptor.getTableDir(rootDir, tableName),
- HRegionInfo.encodeRegionName(regionName)),
- HREGION_OLDLOGFILE_NAME);
- Path oldlogfile = null;
- SequenceFile.Reader old = null;
- if (fs.exists(logfile)) {
- LOG.warn("Old log file " + logfile +
- " already exists. Copying existing file to new file");
- oldlogfile = new Path(logfile.toString() + ".old");
- fs.rename(logfile, oldlogfile);
- old = new SequenceFile.Reader(fs, oldlogfile, conf);
- }
- w = SequenceFile.createWriter(fs, conf, logfile, HLogKey.class,
- HLogEdit.class, getCompressionType(conf));
- // Use copy of regionName; regionName object is reused inside in
- // HStoreKey.getRegionName so its content changes as we iterate.
- logWriters.put(regionName, w);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Creating new log file writer for path " + logfile +
- " and region " + Bytes.toString(regionName));
- }
-
- if (old != null) {
- // Copy from existing log file
- HLogKey oldkey = new HLogKey();
- HLogEdit oldval = new HLogEdit();
- for (; old.next(oldkey, oldval); count++) {
- if (LOG.isDebugEnabled() && count > 0 && count % 10000 == 0) {
- LOG.debug("Copied " + count + " edits");
- }
- w.append(oldkey, oldval);
- }
- old.close();
- fs.delete(oldlogfile, true);
- }
+ while (in.next(key, val)) {
+ byte[] regionName = key.getRegionName();
+ LinkedList<HLogEntry> queue = logEntries.get(regionName);
+ if (queue == null) {
+ queue = new LinkedList<HLogEntry>();
+ LOG.debug("Adding queue for " + Bytes.toString(regionName));
+ logEntries.put(regionName, queue);
}
- w.append(key, val);
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Applied " + count + " total edits from " +
- logfiles[i].getPath().toString());
+ queue.push(new HLogEntry(val, key));
+ count++;
}
+ LOG.debug("Pushed " + count + " entries");
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
if (!(e instanceof EOFException)) {
LOG.warn("Exception processing " + logfiles[i].getPath() +
" -- continuing. Possible DATA LOSS!", e);
}
- } finally {
- try {
- in.close();
- } catch (IOException e) {
- LOG.warn("Close in finally threw exception -- continuing", e);
- }
- // Delete the input file now so we do not replay edits. We could
- // have gotten here because of an exception. If so, probably
- // nothing we can do about it. Replaying it, it could work but we
- // could be stuck replaying for ever. Just continue though we
- // could have lost some edits.
- fs.delete(logfiles[i].getPath(), true);
}
} catch (IOException e) {
- if (possiblyEmpty) {
+ if (length <= 0) {
+ LOG.warn("Empty log, continuing: " + logfiles[i]);
continue;
}
throw e;
+ } finally {
+ try {
+ if (in != null) {
+ in.close();
+ }
+ } catch (IOException e) {
+ LOG.warn("Close in finally threw exception -- continuing", e);
+ }
+ // Delete the input file now so we do not replay edits. We could
+ // have gotten here because of an exception. If so, probably
+ // nothing we can do about it. Replaying it, it could work but we
+ // could be stuck replaying for ever. Just continue though we
+ // could have lost some edits.
+ fs.delete(logfiles[i].getPath(), true);
+ }
+ }
+ ExecutorService threadPool =
+ Executors.newFixedThreadPool(DEFAULT_NUMBER_LOG_WRITER_THREAD);
+ for (final byte[] key : logEntries.keySet()) {
+
+ Thread thread = new Thread(Bytes.toString(key)) {
+ public void run() {
+ LinkedList<HLogEntry> entries = logEntries.get(key);
+ LOG.debug("Thread got " + entries.size() + " to process");
+ long threadTime = System.currentTimeMillis();
+ try {
+ int count = 0;
+ for (HLogEntry logEntry : entries) {
+ SequenceFile.Writer w = logWriters.get(key);
+ if (w == null) {
+ Path logfile = new Path(HRegion.getRegionDir(HTableDescriptor
+ .getTableDir(rootDir, logEntry.getKey().getTablename()),
+ HRegionInfo.encodeRegionName(key)),
+ HREGION_OLDLOGFILE_NAME);
+ Path oldlogfile = null;
+ SequenceFile.Reader old = null;
+ if (fs.exists(logfile)) {
+ LOG.warn("Old log file " + logfile
+ + " already exists. Copying existing file to new file");
+ oldlogfile = new Path(logfile.toString() + ".old");
+ fs.rename(logfile, oldlogfile);
+ old = new SequenceFile.Reader(fs, oldlogfile, conf);
+ }
+ w = SequenceFile.createWriter(fs, conf, logfile,
+ HLogKey.class, HLogEdit.class, getCompressionType(conf));
+ // Use copy of regionName; regionName object is reused inside
+ // in
+ // HStoreKey.getRegionName so its content changes as we
+ // iterate.
+ logWriters.put(key, w);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Creating new log file writer for path "
+ + logfile + " and region " + Bytes.toString(key));
+ }
+
+ if (old != null) {
+ // Copy from existing log file
+ HLogKey oldkey = new HLogKey();
+ HLogEdit oldval = new HLogEdit();
+ for (; old.next(oldkey, oldval); count++) {
+ if (LOG.isDebugEnabled() && count > 0
+ && count % 10000 == 0) {
+ LOG.debug("Copied " + count + " edits");
+ }
+ w.append(oldkey, oldval);
+ }
+ old.close();
+ fs.delete(oldlogfile, true);
+ }
+ }
+ w.append(logEntry.getKey(), logEntry.getEdit());
+ count++;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Applied " + count + " total edits to "
+ + Bytes.toString(key) + " in "
+ + (System.currentTimeMillis() - threadTime) + "ms");
+ }
+ } catch (IOException e) {
+ e = RemoteExceptionHandler.checkIOException(e);
+ LOG.warn("Got while writing region " + Bytes.toString(key)
+ + " log " + e);
+ e.printStackTrace();
+ }
+ }
+ };
+ threadPool.execute(thread);
+ }
+ threadPool.shutdown();
+ // Wait for all threads to terminate
+ try {
+ for(int i = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS) ; i++) {
+ LOG.debug("Waiting for log writers to terminate, iteration #" + i);
}
+ }catch(InterruptedException ex) {
+ LOG.warn("Log writers were interrupted, possible data loss!");
}
} finally {
for (SequenceFile.Writer w : logWriters.values()) {
@@ -851,6 +907,40 @@
}
}
}
+
+ /**
+ * Utility class that lets us keep track of the edit with it's key
+ * Only used when splitting logs
+ */
+ public static class HLogEntry {
+ private HLogEdit edit;
+ private HLogKey key;
+ /**
+ * Constructor for both params
+ * @param edit log's edit
+ * @param key log's key
+ */
+ public HLogEntry(HLogEdit edit, HLogKey key) {
+ super();
+ this.edit = edit;
+ this.key = key;
+ }
+ /**
+ * Gets the edit
+ * @return edit
+ */
+ public HLogEdit getEdit() {
+ return edit;
+ }
+ /**
+ * Gets the key
+ * @return key
+ */
+ public HLogKey getKey() {
+ return key;
+ }
+
+ }
/**
* Construct the HLog directory name