You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2009/05/20 19:06:30 UTC
svn commit: r776768 - in /hadoop/hbase/branches/0.19: CHANGES.txt
src/java/org/apache/hadoop/hbase/HConstants.java
src/java/org/apache/hadoop/hbase/regionserver/HLog.java
Author: jdcryans
Date: Wed May 20 17:06:27 2009
New Revision: 776768
URL: http://svn.apache.org/viewvc?rev=776768&view=rev
Log:
HBASE-1430 Read the logs in batches during log splitting to avoid OOME
Modified:
hadoop/hbase/branches/0.19/CHANGES.txt
hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/HConstants.java
hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
Modified: hadoop/hbase/branches/0.19/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/CHANGES.txt?rev=776768&r1=776767&r2=776768&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/CHANGES.txt (original)
+++ hadoop/hbase/branches/0.19/CHANGES.txt Wed May 20 17:06:27 2009
@@ -27,6 +27,7 @@
N minutes/hours
HBASE-1420 add abliity to add and remove table) indexes on existing
tables (Clint Morgan via Stack)
+ HBASE-1430 Read the logs in batches during log splitting to avoid OOME
Release 0.19.2 - May 9th, 2009
BUG FIXES
Modified: hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/HConstants.java?rev=776768&r1=776767&r2=776768&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/HConstants.java Wed May 20 17:06:27 2009
@@ -124,6 +124,10 @@
* to rewrite the logs. More means faster but bigger mem consumption */
static final int DEFAULT_NUMBER_LOG_WRITER_THREAD = 10;
+ /** Default number of logs to read concurrently
+ * when log splitting. More means faster but bigger mem consumption */
+ static final int DEFAULT_NUMBER_CONCURRENT_LOG_READS = 10;
+
// Always store the location of the root table's HRegion.
// This HRegion is never split.
Modified: hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HLog.java?rev=776768&r1=776767&r2=776768&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HLog.java (original)
+++ hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HLog.java Wed May 20 17:06:27 2009
@@ -761,156 +761,163 @@
*/
private static void splitLog(final Path rootDir, final FileStatus [] logfiles,
final FileSystem fs, final Configuration conf)
- throws IOException {
- final Map<byte [], SequenceFile.Writer> logWriters =
- new TreeMap<byte [], SequenceFile.Writer>(Bytes.BYTES_COMPARATOR);
- final Map<byte[], LinkedList<HLogEntry>> logEntries =
- new TreeMap<byte[], LinkedList<HLogEntry>>(Bytes.BYTES_COMPARATOR);
- try {
- for (int i = 0; i < logfiles.length; i++) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Splitting " + (i + 1) + " of " + logfiles.length + ": " +
- logfiles[i].getPath() + ", length=" + logfiles[i].getLen());
- }
- // Check for possibly empty file. With appends, currently Hadoop reports
- // a zero length even if the file has been sync'd. Revisit if
- // HADOOP-4751 is committed.
- long length = logfiles[i].getLen();
- HLogKey key = new HLogKey();
- HLogEdit val = new HLogEdit();
- SequenceFile.Reader in = null;
- try {
- in = new SequenceFile.Reader(fs, logfiles[i].getPath(), conf);
- try {
- int count = 0;
- while (in.next(key, val)) {
- byte[] regionName = key.getRegionName();
- LinkedList<HLogEntry> queue = logEntries.get(regionName);
- if (queue == null) {
- queue = new LinkedList<HLogEntry>();
- LOG.debug("Adding queue for " + Bytes.toString(regionName));
- logEntries.put(regionName, queue);
- }
- queue.push(new HLogEntry(val, key));
- count++;
- }
- LOG.debug("Pushed " + count + " entries");
- } catch (IOException e) {
- e = RemoteExceptionHandler.checkIOException(e);
- if (!(e instanceof EOFException)) {
- LOG.warn("Exception processing " + logfiles[i].getPath() +
- " -- continuing. Possible DATA LOSS!", e);
+ throws IOException {
+ final Map<byte [], SequenceFile.Writer> logWriters =
+ new TreeMap<byte [], SequenceFile.Writer>(Bytes.BYTES_COMPARATOR);
+
+ try {
+ int maxSteps = Double.valueOf(Math.ceil((logfiles.length * 1.0) /
+ DEFAULT_NUMBER_CONCURRENT_LOG_READS)).intValue();
+ for(int step = 0; step < maxSteps; step++) {
+ final Map<byte[], LinkedList<HLogEntry>> logEntries =
+ new TreeMap<byte[], LinkedList<HLogEntry>>(Bytes.BYTES_COMPARATOR);
+ // Stop at logfiles.length when it's the last step
+ int endIndex = step == maxSteps - 1 ? logfiles.length :
+ step*DEFAULT_NUMBER_CONCURRENT_LOG_READS +
+ DEFAULT_NUMBER_CONCURRENT_LOG_READS;
+ for (int i = (step * 10); i < endIndex; i++) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length +
+ ": " + logfiles[i].getPath() +
+ ", length=" + logfiles[i].getLen());
}
- }
- } catch (IOException e) {
- if (length <= 0) {
- LOG.warn("Empty log, continuing: " + logfiles[i]);
- continue;
- }
- throw e;
- } finally {
- try {
- if (in != null) {
- in.close();
+ // Check for possibly empty file. With appends, currently Hadoop
+ // reports a zero length even if the file has been sync'd. Revisit if
+ // HADOOP-4751 is committed.
+ long length = logfiles[i].getLen();
+ HLogKey key = new HLogKey();
+ HLogEdit val = new HLogEdit();
+ SequenceFile.Reader in = null;
+ try {
+ in = new SequenceFile.Reader(fs, logfiles[i].getPath(), conf);
+ try {
+ int count = 0;
+ while (in.next(key, val)) {
+ byte [] regionName = key.getRegionName();
+ LinkedList<HLogEntry> queue = logEntries.get(regionName);
+ if (queue == null) {
+ queue = new LinkedList<HLogEntry>();
+ LOG.debug("Adding queue for " + Bytes.toString(regionName));
+ logEntries.put(regionName, queue);
+ }
+ queue.push(new HLogEntry(val, key));
+ count++;
+ }
+ LOG.debug("Pushed " + count + " entries from " +
+ logfiles[i].getPath());
+ } catch (IOException e) {
+ e = RemoteExceptionHandler.checkIOException(e);
+ if (!(e instanceof EOFException)) {
+ LOG.warn("Exception processing " + logfiles[i].getPath() +
+ " -- continuing. Possible DATA LOSS!", e);
+ }
+ }
+ } catch (IOException e) {
+ if (length <= 0) {
+ LOG.warn("Empty hlog, continuing: " + logfiles[i]);
+ continue;
+ }
+ throw e;
+ } finally {
+ try {
+ if (in != null) {
+ in.close();
+ }
+ } catch (IOException e) {
+ LOG.warn("Close in finally threw exception -- continuing", e);
+ }
+ // Delete the input file now so we do not replay edits. We could
+ // have gotten here because of an exception. If so, probably
+ // nothing we can do about it. Replaying it, it could work but we
+ // could be stuck replaying for ever. Just continue though we
+ // could have lost some edits.
+ fs.delete(logfiles[i].getPath(), true);
}
- } catch (IOException e) {
- LOG.warn("Close in finally threw exception -- continuing", e);
}
- // Delete the input file now so we do not replay edits. We could
- // have gotten here because of an exception. If so, probably
- // nothing we can do about it. Replaying it, it could work but we
- // could be stuck replaying for ever. Just continue though we
- // could have lost some edits.
- fs.delete(logfiles[i].getPath(), true);
- }
- }
- ExecutorService threadPool =
- Executors.newFixedThreadPool(DEFAULT_NUMBER_LOG_WRITER_THREAD);
- for (final byte[] key : logEntries.keySet()) {
+ ExecutorService threadPool =
+ Executors.newFixedThreadPool(DEFAULT_NUMBER_LOG_WRITER_THREAD);
+ for (final byte[] key : logEntries.keySet()) {
- Thread thread = new Thread(Bytes.toString(key)) {
- public void run() {
- LinkedList<HLogEntry> entries = logEntries.get(key);
- LOG.debug("Thread got " + entries.size() + " to process");
- long threadTime = System.currentTimeMillis();
- try {
- int count = 0;
- for (HLogEntry logEntry : entries) {
- SequenceFile.Writer w = logWriters.get(key);
- if (w == null) {
- Path logfile = new Path(HRegion.getRegionDir(HTableDescriptor
- .getTableDir(rootDir, logEntry.getKey().getTablename()),
- HRegionInfo.encodeRegionName(key)),
- HREGION_OLDLOGFILE_NAME);
- Path oldlogfile = null;
- SequenceFile.Reader old = null;
- if (fs.exists(logfile)) {
- LOG.warn("Old log file " + logfile
- + " already exists. Copying existing file to new file");
- oldlogfile = new Path(logfile.toString() + ".old");
- fs.rename(logfile, oldlogfile);
- old = new SequenceFile.Reader(fs, oldlogfile, conf);
- }
- w = SequenceFile.createWriter(fs, conf, logfile,
- HLogKey.class, HLogEdit.class, getCompressionType(conf));
- // Use copy of regionName; regionName object is reused inside
- // in
- // HStoreKey.getRegionName so its content changes as we
- // iterate.
- logWriters.put(key, w);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Creating new log file writer for path "
- + logfile + " and region " + Bytes.toString(key));
- }
+ Thread thread = new Thread(Bytes.toString(key)) {
+ public void run() {
+ LinkedList<HLogEntry> entries = logEntries.get(key);
+ LOG.debug("Thread got " + entries.size() + " to process");
+ long threadTime = System.currentTimeMillis();
+ try {
+ int count = 0;
+ for (HLogEntry logEntry : entries) {
+ SequenceFile.Writer w = logWriters.get(key);
+ if (w == null) {
+ Path logfile = new Path(HRegion.getRegionDir(HTableDescriptor
+ .getTableDir(rootDir, logEntry.getKey().getTablename()),
+ HRegionInfo.encodeRegionName(key)),
+ HREGION_OLDLOGFILE_NAME);
+ Path oldlogfile = null;
+ SequenceFile.Reader old = null;
+ if (fs.exists(logfile)) {
+ LOG.warn("Old hlog file " + logfile
+ + " already exists. Copying existing file to new file");
+ oldlogfile = new Path(logfile.toString() + ".old");
+ fs.rename(logfile, oldlogfile);
+ old = new SequenceFile.Reader(fs, oldlogfile, conf);
+ }
+ w = SequenceFile.createWriter(fs, conf, logfile,
+ HLogKey.class, HLogEdit.class, getCompressionType(conf));
+ logWriters.put(key, w);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Creating new hlog file writer for path "
+ + logfile + " and region " + Bytes.toString(key));
+ }
- if (old != null) {
- // Copy from existing log file
- HLogKey oldkey = new HLogKey();
- HLogEdit oldval = new HLogEdit();
- for (; old.next(oldkey, oldval); count++) {
- if (LOG.isDebugEnabled() && count > 0
- && count % 10000 == 0) {
- LOG.debug("Copied " + count + " edits");
+ if (old != null) {
+ // Copy from existing log file
+ HLogKey oldkey = new HLogKey();
+ HLogEdit oldval = new HLogEdit();
+ for (; old.next(oldkey, oldval); count++) {
+ if (LOG.isDebugEnabled() && count > 0
+ && count % 10000 == 0) {
+ LOG.debug("Copied " + count + " edits");
+ }
+ w.append(oldkey, oldval);
+ }
+ old.close();
+ fs.delete(oldlogfile, true);
}
- w.append(oldkey, oldval);
}
- old.close();
- fs.delete(oldlogfile, true);
+ w.append(logEntry.getKey(), logEntry.getEdit());
+ count++;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Applied " + count + " total edits to "
+ + Bytes.toString(key) + " in "
+ + (System.currentTimeMillis() - threadTime) + "ms");
}
+ } catch (IOException e) {
+ e = RemoteExceptionHandler.checkIOException(e);
+ LOG.warn("Got while writing region " + Bytes.toString(key)
+ + " log " + e);
+ e.printStackTrace();
}
- w.append(logEntry.getKey(), logEntry.getEdit());
- count++;
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("Applied " + count + " total edits to "
- + Bytes.toString(key) + " in "
- + (System.currentTimeMillis() - threadTime) + "ms");
- }
- } catch (IOException e) {
- e = RemoteExceptionHandler.checkIOException(e);
- LOG.warn("Got while writing region " + Bytes.toString(key)
- + " log " + e);
- e.printStackTrace();
+ };
+ threadPool.execute(thread);
+ }
+ threadPool.shutdown();
+ // Wait for all threads to terminate
+ try {
+ for(int i = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS); i++) {
+ LOG.debug("Waiting for hlog writers to terminate, iteration #" + i);
}
+ }catch(InterruptedException ex) {
+ LOG.warn("Hlog writers were interrupted, possible data loss!");
}
- };
- threadPool.execute(thread);
- }
- threadPool.shutdown();
- // Wait for all threads to terminate
- try {
- for(int i = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS) ; i++) {
- LOG.debug("Waiting for log writers to terminate, iteration #" + i);
}
- }catch(InterruptedException ex) {
- LOG.warn("Log writers were interrupted, possible data loss!");
- }
- } finally {
- for (SequenceFile.Writer w : logWriters.values()) {
- w.close();
+ } finally {
+ for (SequenceFile.Writer w : logWriters.values()) {
+ w.close();
+ }
}
}
- }
/**
* Utility class that lets us keep track of the edit with it's key