You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2010/12/20 21:36:18 UTC
svn commit: r1051279 - in /hbase/branches/0.90: ./
src/main/java/org/apache/hadoop/hbase/master/
src/main/java/org/apache/hadoop/hbase/regionserver/wal/
src/test/java/org/apache/hadoop/hbase/regionserver/wal/
Author: stack
Date: Mon Dec 20 20:36:17 2010
New Revision: 1051279
URL: http://svn.apache.org/viewvc?rev=1051279&view=rev
Log:
HBASE-3323 OOME in master splitting logs
Modified:
hbase/branches/0.90/CHANGES.txt
hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java
hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
Modified: hbase/branches/0.90/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/CHANGES.txt?rev=1051279&r1=1051278&r2=1051279&view=diff
==============================================================================
--- hbase/branches/0.90/CHANGES.txt (original)
+++ hbase/branches/0.90/CHANGES.txt Mon Dec 20 20:36:17 2010
@@ -761,6 +761,7 @@ Release 0.90.0 - Unreleased
HBASE-3370 ReplicationSource.openReader fails to locate HLogs when they
aren't split yet
HBASE-3371 Race in TestReplication can make it fail
+ HBASE-3323 OOME in master splitting logs
IMPROVEMENTS
Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java?rev=1051279&r1=1051278&r2=1051279&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java Mon Dec 20 20:36:17 2010
@@ -190,12 +190,13 @@ public class MasterFileSystem {
long splitTime = 0, splitLogSize = 0;
Path logDir = new Path(this.rootdir, HLog.getHLogDirectoryName(serverName));
try {
- HLogSplitter splitter = HLogSplitter.createLogSplitter(conf);
+ HLogSplitter splitter = HLogSplitter.createLogSplitter(
+ conf, rootdir, logDir, oldLogDir, this.fs);
try {
- splitter.splitLog(this.rootdir, logDir, oldLogDir, this.fs, conf);
+ splitter.splitLog();
} catch (OrphanHLogAfterSplitException e) {
LOG.warn("Retrying splitting because of:", e);
- splitter.splitLog(this.rootdir, logDir, oldLogDir, this.fs, conf);
+ splitter.splitLog();
}
splitTime = splitter.getTime();
splitLogSize = splitter.getSize();
Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1051279&r1=1051278&r2=1051279&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Mon Dec 20 20:36:17 2010
@@ -1439,8 +1439,9 @@ public class HLog implements Syncable {
throw new IOException(p + " is not a directory");
}
- HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
- logSplitter.splitLog(baseDir, p, oldLogDir, fs, conf);
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(
+ conf, baseDir, p, oldLogDir, fs);
+ logSplitter.splitLog();
}
/**
Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java?rev=1051279&r1=1051278&r2=1051279&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java Mon Dec 20 20:36:17 2010
@@ -160,6 +160,32 @@ public class HLogKey implements Writable
return result;
}
+ /**
+ * Drop this instance's tablename byte array and instead
+ * hold a reference to the provided tablename. This is not
+ * meant to be a general purpose setter - it's only used
+ * to collapse references to conserve memory.
+ */
+ void internTableName(byte []tablename) {
+ // We should not use this as a setter - only to swap
+ // in a new reference to the same table name.
+ assert Bytes.equals(tablename, this.tablename);
+ this.tablename = tablename;
+ }
+
+ /**
+ * Drop this instance's region name byte array and instead
+ * hold a reference to the provided region name. This is not
+ * meant to be a general purpose setter - it's only used
+ * to collapse references to conserve memory.
+ */
+ void internEncodedRegionName(byte []encodedRegionName) {
+ // We should not use this as a setter - only to swap
+ // in a new reference to the same table name.
+ assert Bytes.equals(this.encodedRegionName, encodedRegionName);
+ this.encodedRegionName = encodedRegionName;
+ }
+
public void write(DataOutput out) throws IOException {
Bytes.writeByteArray(out, this.encodedRegionName);
Bytes.writeByteArray(out, this.tablename);
Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1051279&r1=1051278&r2=1051279&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java Mon Dec 20 20:36:17 2010
@@ -23,21 +23,18 @@ import static org.apache.hadoop.hbase.ut
import java.io.EOFException;
import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeMap;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -45,6 +42,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
@@ -53,9 +51,11 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.io.MultipleIOException;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
/**
* This class is responsible for splitting up a bunch of regionserver commit log
@@ -66,74 +66,109 @@ public class HLogSplitter {
private static final String LOG_SPLITTER_IMPL = "hbase.hlog.splitter.impl";
- static final Log LOG = LogFactory.getLog(HLogSplitter.class);
-
- private long splitTime = 0;
- private long splitSize = 0;
-
/**
* Name of file that holds recovered edits written by the wal log splitting
* code, one per region
*/
public static final String RECOVERED_EDITS = "recovered.edits";
+
+ static final Log LOG = LogFactory.getLog(HLogSplitter.class);
+
+ private boolean hasSplit = false;
+ private long splitTime = 0;
+ private long splitSize = 0;
+
+
+ // Parameters for split process
+ protected final Path rootDir;
+ protected final Path srcDir;
+ protected final Path oldLogDir;
+ protected final FileSystem fs;
+ protected final Configuration conf;
+
+ // Major subcomponents of the split process.
+ // These are separated into inner classes to make testing easier.
+ OutputSink outputSink;
+ EntryBuffers entryBuffers;
+
+ // If an exception is thrown by one of the other threads, it will be
+ // stored here.
+ protected AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
+
+ // Wait/notify for when data has been produced by the reader thread,
+ // consumed by the reader thread, or an exception occurred
+ Object dataAvailable = new Object();
+
+
/**
* Create a new HLogSplitter using the given {@link Configuration} and the
* <code>hbase.hlog.splitter.impl</code> property to derived the instance
* class to use.
- *
- * @param conf
- * @return New HLogSplitter instance
- */
- public static HLogSplitter createLogSplitter(Configuration conf) {
+ *
+ * @param rootDir hbase directory
+ * @param srcDir logs directory
+ * @param oldLogDir directory where processed logs are archived to
+ * @param logfiles the list of log files to split
+ */
+ public static HLogSplitter createLogSplitter(Configuration conf,
+ final Path rootDir, final Path srcDir,
+ Path oldLogDir, final FileSystem fs) {
+
@SuppressWarnings("unchecked")
Class<? extends HLogSplitter> splitterClass = (Class<? extends HLogSplitter>) conf
.getClass(LOG_SPLITTER_IMPL, HLogSplitter.class);
try {
- return splitterClass.newInstance();
+ Constructor<? extends HLogSplitter> constructor =
+ splitterClass.getConstructor(
+ Configuration.class, // conf
+ Path.class, // rootDir
+ Path.class, // srcDir
+ Path.class, // oldLogDir
+ FileSystem.class); // fs
+ return constructor.newInstance(conf, rootDir, srcDir, oldLogDir, fs);
+ } catch (IllegalArgumentException e) {
+ throw new RuntimeException(e);
} catch (InstantiationException e) {
throw new RuntimeException(e);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
+ } catch (InvocationTargetException e) {
+ throw new RuntimeException(e);
+ } catch (SecurityException e) {
+ throw new RuntimeException(e);
+ } catch (NoSuchMethodException e) {
+ throw new RuntimeException(e);
}
}
-
-
- // Private immutable datastructure to hold Writer and its Path.
- private final static class WriterAndPath {
- final Path p;
- final Writer w;
-
- WriterAndPath(final Path p, final Writer w) {
- this.p = p;
- this.w = w;
- }
+ public HLogSplitter(Configuration conf, Path rootDir, Path srcDir,
+ Path oldLogDir, FileSystem fs) {
+ this.conf = conf;
+ this.rootDir = rootDir;
+ this.srcDir = srcDir;
+ this.oldLogDir = oldLogDir;
+ this.fs = fs;
+
+ entryBuffers = new EntryBuffers(
+ conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
+ 128*1024*1024));
+ outputSink = new OutputSink();
}
-
+
/**
* Split up a bunch of regionserver commit log files that are no longer being
* written to, into new files, one per region for region to replay on startup.
* Delete the old log files when finished.
*
- * @param rootDir
- * qualified root directory of the HBase instance
- * @param srcDir
- * Directory of log files to split: e.g.
- * <code>${ROOTDIR}/log_HOST_PORT</code>
- * @param oldLogDir
- * directory where processed (split) logs will be archived to
- * @param fs
- * FileSystem
- * @param conf
- * Configuration
- * @throws IOException
- * will throw if corrupted hlogs aren't tolerated
+ * @throws IOException will throw if corrupted hlogs aren't tolerated
* @return the list of splits
*/
- public List<Path> splitLog(final Path rootDir, final Path srcDir,
- Path oldLogDir, final FileSystem fs, final Configuration conf)
+ public List<Path> splitLog()
throws IOException {
+ Preconditions.checkState(!hasSplit,
+ "An HLogSplitter instance may only be used once");
+ hasSplit = true;
long startTime = System.currentTimeMillis();
List<Path> splits = null;
@@ -148,29 +183,8 @@ public class HLogSplitter {
}
LOG.info("Splitting " + logfiles.length + " hlog(s) in "
+ srcDir.toString());
- splits = splitLog(rootDir, srcDir, oldLogDir, logfiles, fs, conf);
- try {
- FileStatus[] files = fs.listStatus(srcDir);
- for (FileStatus file : files) {
- Path newPath = HLog.getHLogArchivePath(oldLogDir, file.getPath());
- LOG.info("Moving " + FSUtils.getPath(file.getPath()) + " to "
- + FSUtils.getPath(newPath));
- if (!fs.rename(file.getPath(), newPath)) {
- throw new IOException("Unable to rename " + file.getPath() +
- " to " + newPath);
- }
- }
- LOG.debug("Moved " + files.length + " log files to "
- + FSUtils.getPath(oldLogDir));
- if (!fs.delete(srcDir, true)) {
- throw new IOException("Unable to delete " + srcDir);
- }
- } catch (IOException e) {
- e = RemoteExceptionHandler.checkIOException(e);
- IOException io = new IOException("Cannot delete: " + srcDir);
- io.initCause(e);
- throw io;
- }
+ splits = splitLog(logfiles);
+
splitTime = System.currentTimeMillis() - startTime;
LOG.info("hlog file splitting completed in " + splitTime +
" ms for " + srcDir.toString());
@@ -190,100 +204,77 @@ public class HLogSplitter {
public long getSize() {
return this.splitSize;
}
+
+ /**
+ * @return a map from encoded region ID to the number of edits written out
+ * for that region.
+ */
+ Map<byte[], Long> getOutputCounts() {
+ Preconditions.checkState(hasSplit);
+ return outputSink.getOutputCounts();
+ }
/**
- * Sorts the HLog edits in the given list of logfiles (that are a mix of edits
+ * Splits the HLog edits in the given list of logfiles (that are a mix of edits
* on multiple regions) by region and then splits them per region directories,
* in batches of (hbase.hlog.split.batch.size)
*
- * A batch consists of a set of log files that will be sorted in a single map
- * of edits indexed by region the resulting map will be concurrently written
- * by multiple threads to their corresponding regions
+ * This process is split into multiple threads. In the main thread, we loop
+ * through the logs to be split. For each log, we:
+ * <ul>
+ * <li> Recover it (take and drop HDFS lease) to ensure no other process can write</li>
+ * <li> Read each edit (see {@link #parseHLog}</li>
+ * <li> Mark as "processed" or "corrupt" depending on outcome</li>
+ * </ul>
*
- * Each batch consists of more more log files that are - recovered (files is
- * opened for append then closed to ensure no process is writing into it) -
- * parsed (each edit in the log is appended to a list of edits indexed by
- * region see {@link #parseHLog} for more details) - marked as either
- * processed or corrupt depending on parsing outcome - the resulting edits
- * indexed by region are concurrently written to their corresponding region
- * region directories - original files are then archived to a different
- * directory
+ * Each edit is passed into the EntryBuffers instance, which takes care of
+ * memory accounting and splitting the edits by region.
*
+ * The OutputSink object then manages N other WriterThreads which pull chunks
+ * of edits from EntryBuffers and write them to the output region directories.
*
- *
- * @param rootDir
- * hbase directory
- * @param srcDir
- * logs directory
- * @param oldLogDir
- * directory where processed logs are archived to
- * @param logfiles
- * the list of log files to split
- * @param fs
- * @param conf
- * @return
- * @throws IOException
+ * After the process is complete, the log files are archived to a separate
+ * directory.
*/
- private List<Path> splitLog(final Path rootDir, final Path srcDir,
- Path oldLogDir, final FileStatus[] logfiles, final FileSystem fs,
- final Configuration conf) throws IOException {
+ private List<Path> splitLog(final FileStatus[] logfiles) throws IOException {
List<Path> processedLogs = new ArrayList<Path>();
List<Path> corruptedLogs = new ArrayList<Path>();
- final Map<byte[], WriterAndPath> logWriters = Collections
- .synchronizedMap(new TreeMap<byte[], WriterAndPath>(
- Bytes.BYTES_COMPARATOR));
List<Path> splits = null;
- // Number of logs in a read batch
- // More means faster but bigger mem consumption
- // TODO make a note on the conf rename and update hbase-site.xml if needed
- int logFilesPerStep = conf.getInt("hbase.hlog.split.batch.size", 3);
boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", false);
splitSize = 0;
+
+ outputSink.startWriterThreads(entryBuffers);
try {
- int i = -1;
- while (i < logfiles.length) {
- final Map<byte[], LinkedList<Entry>> editsByRegion = new TreeMap<byte[], LinkedList<Entry>>(
- Bytes.BYTES_COMPARATOR);
- for (int j = 0; j < logFilesPerStep; j++) {
- i++;
- if (i == logfiles.length) {
- break;
- }
- FileStatus log = logfiles[i];
- Path logPath = log.getPath();
- long logLength = log.getLen();
- splitSize += logLength;
- LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length
- + ": " + logPath + ", length=" + logLength);
- try {
- recoverFileLease(fs, logPath, conf);
- parseHLog(log, editsByRegion, fs, conf);
+ int i = 0;
+ for (FileStatus log : logfiles) {
+ Path logPath = log.getPath();
+ long logLength = log.getLen();
+ splitSize += logLength;
+ LOG.debug("Splitting hlog " + (i++ + 1) + " of " + logfiles.length
+ + ": " + logPath + ", length=" + logLength);
+ try {
+ recoverFileLease(fs, logPath, conf);
+ parseHLog(log, entryBuffers, fs, conf);
+ processedLogs.add(logPath);
+ } catch (IOException e) {
+ // If the IOE resulted from bad file format,
+ // then this problem is idempotent and retrying won't help
+ if (e.getCause() instanceof ParseException) {
+ LOG.warn("ParseException from hlog " + logPath + ". continuing");
processedLogs.add(logPath);
- } catch (EOFException eof) {
- // truncated files are expected if a RS crashes (see HBASE-2643)
- LOG.info("EOF from hlog " + logPath + ". continuing");
- processedLogs.add(logPath);
- } catch (IOException e) {
- // If the IOE resulted from bad file format,
- // then this problem is idempotent and retrying won't help
- if (e.getCause() instanceof ParseException) {
- LOG.warn("ParseException from hlog " + logPath + ". continuing");
- processedLogs.add(logPath);
+ } else {
+ if (skipErrors) {
+ LOG.info("Got while parsing hlog " + logPath +
+ ". Marking as corrupted", e);
+ corruptedLogs.add(logPath);
} else {
- if (skipErrors) {
- LOG.info("Got while parsing hlog " + logPath +
- ". Marking as corrupted", e);
- corruptedLogs.add(logPath);
- } else {
- throw e;
- }
+ throw e;
}
}
}
- writeEditsBatchToRegions(editsByRegion, logWriters, rootDir, fs, conf);
}
if (fs.listStatus(srcDir).length > processedLogs.size()
+ corruptedLogs.size()) {
@@ -291,86 +282,13 @@ public class HLogSplitter {
"Discovered orphan hlog after split. Maybe the "
+ "HRegionServer was not dead when we started");
}
- archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf);
+ archiveLogs(srcDir, corruptedLogs, processedLogs, oldLogDir, fs, conf);
} finally {
- splits = new ArrayList<Path>(logWriters.size());
- for (WriterAndPath wap : logWriters.values()) {
- wap.w.close();
- splits.add(wap.p);
- LOG.debug("Closed " + wap.p);
- }
+ splits = outputSink.finishWritingAndClose();
}
return splits;
}
-
- /**
- * Takes splitLogsMap and concurrently writes them to region directories using a thread pool
- *
- * @param splitLogsMap map that contains the log splitting result indexed by region
- * @param logWriters map that contains a writer per region
- * @param rootDir hbase root dir
- * @param fs
- * @param conf
- * @throws IOException
- */
- private void writeEditsBatchToRegions(
- final Map<byte[], LinkedList<Entry>> splitLogsMap,
- final Map<byte[], WriterAndPath> logWriters, final Path rootDir,
- final FileSystem fs, final Configuration conf)
- throws IOException {
- // Number of threads to use when log splitting to rewrite the logs.
- // More means faster but bigger mem consumption.
- int logWriterThreads = conf.getInt(
- "hbase.regionserver.hlog.splitlog.writer.threads", 3);
- boolean skipErrors = conf.getBoolean("hbase.skip.errors", false);
- HashMap<byte[], Future> writeFutureResult = new HashMap<byte[], Future>();
- ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
- builder.setNameFormat("SplitWriter-%1$d");
- ThreadFactory factory = builder.build();
- ThreadPoolExecutor threadPool =
- (ThreadPoolExecutor)Executors.newFixedThreadPool(logWriterThreads, factory);
- for (final byte[] region : splitLogsMap.keySet()) {
- Callable splitter = createNewSplitter(rootDir, logWriters, splitLogsMap,
- region, fs, conf);
- writeFutureResult.put(region, threadPool.submit(splitter));
- }
-
- threadPool.shutdown();
- // Wait for all threads to terminate
- try {
- for (int j = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS); j++) {
- String message = "Waiting for hlog writers to terminate, elapsed " + j
- * 5 + " seconds";
- if (j < 30) {
- LOG.debug(message);
- } else {
- LOG.info(message);
- }
-
- }
- } catch (InterruptedException ex) {
- LOG.warn("Hlog writers were interrupted, possible data loss!");
- if (!skipErrors) {
- throw new IOException("Could not finish writing log entries", ex);
- // TODO maybe we should fail here regardless if skipErrors is active or not
- }
- }
-
- for (Map.Entry<byte[], Future> entry : writeFutureResult.entrySet()) {
- try {
- entry.getValue().get();
- } catch (ExecutionException e) {
- throw (new IOException(e.getCause()));
- } catch (InterruptedException e1) {
- LOG.warn("Writer for region " + Bytes.toString(entry.getKey())
- + " was interrupted, however the write process should have "
- + "finished. Throwing up ", e1);
- throw (new IOException(e1.getCause()));
- }
- }
- }
-
/**
* Moves processed logs to a oldLogDir after successful processing Moves
* corrupted logs (any log that couldn't be successfully parsed to corruptDir
@@ -383,7 +301,9 @@ public class HLogSplitter {
* @param conf
* @throws IOException
*/
- private static void archiveLogs(final List<Path> corruptedLogs,
+ private static void archiveLogs(
+ final Path srcDir,
+ final List<Path> corruptedLogs,
final List<Path> processedLogs, final Path oldLogDir,
final FileSystem fs, final Configuration conf) throws IOException {
final Path corruptDir = new Path(conf.get(HConstants.HBASE_DIR), conf.get(
@@ -411,6 +331,10 @@ public class HLogSplitter {
LOG.info("Archived processed log " + p + " to " + newPath);
}
}
+
+ if (!fs.delete(srcDir, true)) {
+ throw new IOException("Unable to delete src dir: " + srcDir);
+ }
}
/**
@@ -460,7 +384,7 @@ public class HLogSplitter {
* @throws IOException if hlog is corrupted, or can't be open
*/
private void parseHLog(final FileStatus logfile,
- final Map<byte[], LinkedList<Entry>> splitLogsMap, final FileSystem fs,
+ EntryBuffers entryBuffers, final FileSystem fs,
final Configuration conf)
throws IOException {
// Check for possibly empty file. With appends, currently Hadoop reports a
@@ -490,15 +414,11 @@ public class HLogSplitter {
try {
Entry entry;
while ((entry = in.next()) != null) {
- byte[] region = entry.getKey().getEncodedRegionName();
- LinkedList<Entry> queue = splitLogsMap.get(region);
- if (queue == null) {
- queue = new LinkedList<Entry>();
- splitLogsMap.put(region, queue);
- }
- queue.addLast(entry);
+ entryBuffers.appendEntry(entry);
editsCount++;
}
+ } catch (InterruptedException ie) {
+ throw new RuntimeException(ie);
} finally {
LOG.debug("Pushed=" + editsCount + " entries from " + path);
try {
@@ -506,76 +426,30 @@ public class HLogSplitter {
in.close();
}
} catch (IOException e) {
- LOG
- .warn("Close log reader in finally threw exception -- continuing",
- e);
+ LOG.warn("Close log reader in finally threw exception -- continuing",
+ e);
}
}
}
-
- private Callable<Void> createNewSplitter(final Path rootDir,
- final Map<byte[], WriterAndPath> logWriters,
- final Map<byte[], LinkedList<Entry>> logEntries, final byte[] region,
- final FileSystem fs, final Configuration conf) {
- return new Callable<Void>() {
- public String getName() {
- return "Split writer thread for region " + Bytes.toStringBinary(region);
- }
- @Override
- public Void call() throws IOException {
- LinkedList<Entry> entries = logEntries.get(region);
- LOG.debug(this.getName() + " got " + entries.size() + " to process");
- long threadTime = System.currentTimeMillis();
- try {
- int editsCount = 0;
- WriterAndPath wap = logWriters.get(region);
- for (Entry logEntry : entries) {
- if (wap == null) {
- Path regionedits = getRegionSplitEditsPath(fs, logEntry, rootDir);
- if (regionedits == null) {
- // we already print a message if it's null in getRegionSplitEditsPath
- break;
- }
- if (fs.exists(regionedits)) {
- LOG.warn("Found existing old edits file. It could be the "
- + "result of a previous failed split attempt. Deleting "
- + regionedits + ", length="
- + fs.getFileStatus(regionedits).getLen());
- if (!fs.delete(regionedits, false)) {
- LOG.warn("Failed delete of old " + regionedits);
- }
- }
- Writer w = createWriter(fs, regionedits, conf);
- wap = new WriterAndPath(regionedits, w);
- logWriters.put(region, wap);
- LOG.debug("Creating writer path=" + regionedits + " region="
- + Bytes.toStringBinary(region));
- }
- wap.w.append(logEntry);
- editsCount++;
- }
- LOG.debug(this.getName() + " Applied " + editsCount
- + " total edits to " + Bytes.toStringBinary(region) + " in "
- + (System.currentTimeMillis() - threadTime) + "ms");
- } catch (IOException e) {
- e = RemoteExceptionHandler.checkIOException(e);
- LOG.fatal(this.getName() + " Got while writing log entry to log", e);
- throw e;
- }
- return null;
- }
- };
+ private void writerThreadError(Throwable t) {
+ thrown.compareAndSet(null, t);
+ }
+
+ /**
+ * Check for errors in the writer threads. If any is found, rethrow it.
+ */
+ private void checkForErrors() throws IOException {
+ Throwable thrown = this.thrown.get();
+ if (thrown == null) return;
+ if (thrown instanceof IOException) {
+ throw (IOException)thrown;
+ } else {
+ throw new RuntimeException(thrown);
+ }
}
-
/**
* Create a new {@link Writer} for writing log splits.
- *
- * @param fs
- * @param logfile
- * @param conf
- * @return A new Writer instance
- * @throws IOException
*/
protected Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
throws IOException {
@@ -584,16 +458,410 @@ public class HLogSplitter {
/**
* Create a new {@link Reader} for reading logs to split.
- *
- * @param fs
- * @param curLogFile
- * @param conf
- * @return A new Reader instance
- * @throws IOException
*/
protected Reader getReader(FileSystem fs, Path curLogFile, Configuration conf)
throws IOException {
return HLog.getReader(fs, curLogFile, conf);
}
+
+ /**
+ * Class which accumulates edits and separates them into a buffer per region
+ * while simultaneously accounting RAM usage. Blocks if the RAM usage crosses
+ * a predefined threshold.
+ *
+ * Writer threads then pull region-specific buffers from this class.
+ */
+ class EntryBuffers {
+ Map<byte[], RegionEntryBuffer> buffers =
+ new TreeMap<byte[], RegionEntryBuffer>(Bytes.BYTES_COMPARATOR);
+
+ /* Track which regions are currently in the middle of writing. We don't allow
+ an IO thread to pick up bytes from a region if we're already writing
+ data for that region in a different IO thread. */
+ Set<byte[]> currentlyWriting = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+
+ long totalBuffered = 0;
+ long maxHeapUsage;
+
+ EntryBuffers(long maxHeapUsage) {
+ this.maxHeapUsage = maxHeapUsage;
+ }
+
+ /**
+ * Append a log entry into the corresponding region buffer.
+ * Blocks if the total heap usage has crossed the specified threshold.
+ *
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ void appendEntry(Entry entry) throws InterruptedException, IOException {
+ HLogKey key = entry.getKey();
+
+ RegionEntryBuffer buffer;
+ synchronized (this) {
+ buffer = buffers.get(key.getEncodedRegionName());
+ if (buffer == null) {
+ buffer = new RegionEntryBuffer(key.getTablename(), key.getEncodedRegionName());
+ buffers.put(key.getEncodedRegionName(), buffer);
+ }
+ long incrHeap = buffer.appendEntry(entry);
+ totalBuffered += incrHeap;
+ }
+
+ // If we crossed the chunk threshold, wait for more space to be available
+ synchronized (dataAvailable) {
+ while (totalBuffered > maxHeapUsage && thrown == null) {
+ LOG.debug("Used " + totalBuffered + " bytes of buffered edits, waiting for IO threads...");
+ dataAvailable.wait(3000);
+ }
+ dataAvailable.notifyAll();
+ }
+ checkForErrors();
+ }
+
+ synchronized RegionEntryBuffer getChunkToWrite() {
+ long biggestSize=0;
+ byte[] biggestBufferKey=null;
+
+ for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) {
+ long size = entry.getValue().heapSize();
+ if (size > biggestSize && !currentlyWriting.contains(entry.getKey())) {
+ biggestSize = size;
+ biggestBufferKey = entry.getKey();
+ }
+ }
+ if (biggestBufferKey == null) {
+ return null;
+ }
+
+ RegionEntryBuffer buffer = buffers.remove(biggestBufferKey);
+ currentlyWriting.add(biggestBufferKey);
+ return buffer;
+ }
+
+ void doneWriting(RegionEntryBuffer buffer) {
+ synchronized (this) {
+ boolean removed = currentlyWriting.remove(buffer.encodedRegionName);
+ assert removed;
+ }
+ long size = buffer.heapSize();
+
+ synchronized (dataAvailable) {
+ totalBuffered -= size;
+ // We may unblock writers
+ dataAvailable.notifyAll();
+ }
+ }
+
+ synchronized boolean isRegionCurrentlyWriting(byte[] region) {
+ return currentlyWriting.contains(region);
+ }
+ }
+
+ /**
+ * A buffer of some number of edits for a given region.
+ * This accumulates edits and also provides a memory optimization in order to
+ * share a single byte array instance for the table and region name.
+ * Also tracks memory usage of the accumulated edits.
+ */
+ static class RegionEntryBuffer implements HeapSize {
+ long heapInBuffer = 0;
+ List<Entry> entryBuffer;
+ byte[] tableName;
+ byte[] encodedRegionName;
+
+ RegionEntryBuffer(byte[] table, byte[] region) {
+ this.tableName = table;
+ this.encodedRegionName = region;
+ this.entryBuffer = new LinkedList<Entry>();
+ }
+
+ long appendEntry(Entry entry) {
+ internify(entry);
+ entryBuffer.add(entry);
+ long incrHeap = entry.getEdit().heapSize() +
+ ClassSize.align(2 * ClassSize.REFERENCE) + // HLogKey pointers
+ 0; // TODO linkedlist entry
+ heapInBuffer += incrHeap;
+ return incrHeap;
+ }
+
+ private void internify(Entry entry) {
+ HLogKey k = entry.getKey();
+ k.internTableName(this.tableName);
+ k.internEncodedRegionName(this.encodedRegionName);
+ }
+
+ public long heapSize() {
+ return heapInBuffer;
+ }
+ }
+
+
+ class WriterThread extends Thread {
+ private volatile boolean shouldStop = false;
+
+ WriterThread(int i) {
+ super("WriterThread-" + i);
+ }
+
+ public void run() {
+ try {
+ doRun();
+ } catch (Throwable t) {
+ LOG.error("Error in log splitting write thread", t);
+ writerThreadError(t);
+ }
+ }
+
+ private void doRun() throws IOException {
+ LOG.debug("Writer thread " + this + ": starting");
+ while (true) {
+ RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
+ if (buffer == null) {
+ // No data currently available, wait on some more to show up
+ synchronized (dataAvailable) {
+ if (shouldStop) return;
+ try {
+ dataAvailable.wait(1000);
+ } catch (InterruptedException ie) {
+ if (!shouldStop) {
+ throw new RuntimeException(ie);
+ }
+ }
+ }
+ continue;
+ }
+
+ assert buffer != null;
+ try {
+ writeBuffer(buffer);
+ } finally {
+ entryBuffers.doneWriting(buffer);
+ }
+ }
+ }
+
+ private void writeBuffer(RegionEntryBuffer buffer) throws IOException {
+ List<Entry> entries = buffer.entryBuffer;
+ if (entries.isEmpty()) {
+ LOG.warn(this.getName() + " got an empty buffer, skipping");
+ return;
+ }
+
+ WriterAndPath wap = null;
+
+ long startTime = System.nanoTime();
+ try {
+ int editsCount = 0;
+
+ for (Entry logEntry : entries) {
+ if (wap == null) {
+ wap = outputSink.getWriterAndPath(logEntry);
+ if (wap == null) {
+ // getWriterAndPath decided we don't need to write these edits
+ // Message was already logged
+ return;
+ }
+ }
+ wap.w.append(logEntry);
+ editsCount++;
+ }
+ // Pass along summary statistics
+ wap.incrementEdits(editsCount);
+ wap.incrementNanoTime(System.nanoTime() - startTime);
+ } catch (IOException e) {
+ e = RemoteExceptionHandler.checkIOException(e);
+ LOG.fatal(this.getName() + " Got while writing log entry to log", e);
+ throw e;
+ }
+ }
+
+ void finish() {
+ shouldStop = true;
+ }
+ }
+
+ /**
+ * Class that manages the output streams from the log splitting process.
+ */
+ class OutputSink {
+ private final Map<byte[], WriterAndPath> logWriters = Collections.synchronizedMap(
+ new TreeMap<byte[], WriterAndPath>(Bytes.BYTES_COMPARATOR));
+ private final List<WriterThread> writerThreads = Lists.newArrayList();
+
+ /* Set of regions which we've decided should not output edits */
+ private final Set<byte[]> blacklistedRegions = Collections.synchronizedSet(
+ new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR));
+
+ private boolean hasClosed = false;
+
+ /**
+ * Start the threads that will pump data from the entryBuffers
+ * to the output files.
+ * @return the list of started threads
+ */
+ synchronized void startWriterThreads(EntryBuffers entryBuffers) {
+ // More threads could potentially write faster at the expense
+ // of causing more disk seeks as the logs are split.
+ // 3. After a certain setting (probably around 3) the
+ // process will be bound on the reader in the current
+ // implementation anyway.
+ int numThreads = conf.getInt(
+ "hbase.regionserver.hlog.splitlog.writer.threads", 3);
+
+ for (int i = 0; i < numThreads; i++) {
+ WriterThread t = new WriterThread(i);
+ t.start();
+ writerThreads.add(t);
+ }
+ }
+
+ List<Path> finishWritingAndClose() throws IOException {
+ LOG.info("Waiting for split writer threads to finish");
+ for (WriterThread t : writerThreads) {
+ t.finish();
+ }
+ for (WriterThread t: writerThreads) {
+ try {
+ t.join();
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
+ checkForErrors();
+ }
+ LOG.info("Split writers finished");
+
+ return closeStreams();
+ }
+
+ /**
+ * Close all of the output streams.
+ * @return the list of paths written.
+ */
+ private List<Path> closeStreams() throws IOException {
+ Preconditions.checkState(!hasClosed);
+
+ List<Path> paths = new ArrayList<Path>();
+ List<IOException> thrown = Lists.newArrayList();
+
+ for (WriterAndPath wap : logWriters.values()) {
+ try {
+ wap.w.close();
+ } catch (IOException ioe) {
+ LOG.error("Couldn't close log at " + wap.p, ioe);
+ thrown.add(ioe);
+ continue;
+ }
+ paths.add(wap.p);
+ LOG.info("Closed path " + wap.p +" (wrote " + wap.editsWritten + " edits in "
+ + (wap.nanosSpent / 1000/ 1000) + "ms)");
+ }
+ if (!thrown.isEmpty()) {
+ throw MultipleIOException.createIOException(thrown);
+ }
+
+ hasClosed = true;
+ return paths;
+ }
+
+ /**
+ * Get a writer and path for a log starting at the given entry.
+ *
+ * This function is threadsafe so long as multiple threads are always
+ * acting on different regions.
+ *
+ * @return null if this region shouldn't output any logs
+ */
+ WriterAndPath getWriterAndPath(Entry entry) throws IOException {
+
+ byte region[] = entry.getKey().getEncodedRegionName();
+ WriterAndPath ret = logWriters.get(region);
+ if (ret != null) {
+ return ret;
+ }
+
+ // If we already decided that this region doesn't get any output
+ // we don't need to check again.
+ if (blacklistedRegions.contains(region)) {
+ return null;
+ }
+
+ // Need to create writer
+ Path regionedits = getRegionSplitEditsPath(fs,
+ entry, rootDir);
+ if (regionedits == null) {
+ // Edits dir doesn't exist
+ blacklistedRegions.add(region);
+ return null;
+ }
+ deletePreexistingOldEdits(regionedits);
+ Writer w = createWriter(fs, regionedits, conf);
+ ret = new WriterAndPath(regionedits, w);
+ logWriters.put(region, ret);
+ LOG.debug("Creating writer path=" + regionedits + " region="
+ + Bytes.toStringBinary(region));
+
+ return ret;
+ }
+
+ /**
+ * If the specified path exists, issue a warning and delete it.
+ */
+ private void deletePreexistingOldEdits(Path regionedits) throws IOException {
+ if (fs.exists(regionedits)) {
+ LOG.warn("Found existing old edits file. It could be the "
+ + "result of a previous failed split attempt. Deleting "
+ + regionedits + ", length="
+ + fs.getFileStatus(regionedits).getLen());
+ if (!fs.delete(regionedits, false)) {
+ LOG.warn("Failed delete of old " + regionedits);
+ }
+ }
+ }
+
+ /**
+ * @return a map from encoded region ID to the number of edits written out
+ * for that region.
+ */
+ private Map<byte[], Long> getOutputCounts() {
+ TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(
+ Bytes.BYTES_COMPARATOR);
+ synchronized (logWriters) {
+ for (Map.Entry<byte[], WriterAndPath> entry : logWriters.entrySet()) {
+ ret.put(entry.getKey(), entry.getValue().editsWritten);
+ }
+ }
+ return ret;
+ }
+ }
+
+ /**
+ * Private data structure that wraps a Writer and its Path,
+ * also collecting statistics about the data written to this
+ * output.
+ */
+ private final static class WriterAndPath {
+ final Path p;
+ final Writer w;
+
+ /* Count of edits written to this path */
+ long editsWritten = 0;
+ /* Number of nanos spent writing to this log */
+ long nanosSpent = 0;
+
+ WriterAndPath(final Path p, final Writer w) {
+ this.p = p;
+ this.w = w;
+ }
+
+ void incrementEdits(int edits) {
+ editsWritten += edits;
+ }
+
+ void incrementNanoTime(long nanos) {
+ nanosSpent += nanos;
+ }
+ }
}
Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java?rev=1051279&r1=1051278&r2=1051279&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java Mon Dec 20 20:36:17 2010
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.NavigableMap;
import java.util.TreeMap;
+import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
@@ -66,7 +67,7 @@ import org.apache.hadoop.io.Writable;
* is an old style KeyValue or the new style WALEdit.
*
*/
-public class WALEdit implements Writable {
+public class WALEdit implements Writable, HeapSize {
private final int VERSION_2 = -1;
@@ -154,7 +155,19 @@ public class WALEdit implements Writable
out.writeInt(scopes.get(key));
}
}
+ }
+ public long heapSize() {
+ long ret = 0;
+ for (KeyValue kv : kvs) {
+ ret += kv.heapSize();
+ }
+ if (scopes != null) {
+ ret += ClassSize.TREEMAP;
+ ret += ClassSize.align(scopes.size() * ClassSize.MAP_ENTRY);
+ // TODO this isn't quite right, need help here
+ }
+ return ret;
}
public String toString() {
Modified: hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java?rev=1051279&r1=1051278&r2=1051279&view=diff
==============================================================================
--- hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java (original)
+++ hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java Mon Dec 20 20:36:17 2010
@@ -164,10 +164,10 @@ public class TestHLog {
log.rollWriter();
}
log.close();
- HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
+ hbaseDir, logdir, this.oldLogDir, this.fs);
List<Path> splits =
- logSplitter.splitLog(hbaseDir, logdir,
- this.oldLogDir, this.fs, conf);
+ logSplitter.splitLog();
verifySplits(splits, howmany);
log = null;
} finally {
Modified: hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java?rev=1051279&r1=1051278&r2=1051279&view=diff
==============================================================================
--- hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java (original)
+++ hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java Mon Dec 20 20:36:17 2010
@@ -24,16 +24,28 @@ import static org.junit.Assert.*;
import java.io.IOException;
import java.util.NavigableSet;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValueTestUtil;
+import org.apache.hadoop.hbase.MultithreadedTestUtil;
+import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
+import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
+import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.EntryBuffers;
+import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.RegionEntryBuffer;
+import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
+import static org.mockito.Mockito.mock;
/**
* Simple testing of a few HLog methods.
*/
public class TestHLogMethods {
+ private static final byte[] TEST_REGION = Bytes.toBytes("test_region");;
+ private static final byte[] TEST_TABLE = Bytes.toBytes("test_table");
+
private final HBaseTestingUtility util = new HBaseTestingUtility();
/**
@@ -84,4 +96,71 @@ public class TestHLogMethods {
FSDataOutputStream fdos = fs.create(new Path(testdir, name), true);
fdos.close();
}
-}
\ No newline at end of file
+
+ @Test
+ public void testRegionEntryBuffer() throws Exception {
+ HLogSplitter.RegionEntryBuffer reb = new HLogSplitter.RegionEntryBuffer(
+ TEST_TABLE, TEST_REGION);
+ assertEquals(0, reb.heapSize());
+
+ reb.appendEntry(createTestLogEntry(1));
+ assertTrue(reb.heapSize() > 0);
+ }
+
+ @Test
+ public void testEntrySink() throws Exception {
+ Configuration conf = new Configuration();
+ HLogSplitter splitter = HLogSplitter.createLogSplitter(
+ conf, mock(Path.class), mock(Path.class), mock(Path.class),
+ mock(FileSystem.class));
+
+ EntryBuffers sink = splitter.new EntryBuffers(1*1024*1024);
+ for (int i = 0; i < 1000; i++) {
+ HLog.Entry entry = createTestLogEntry(i);
+ sink.appendEntry(entry);
+ }
+
+ assertTrue(sink.totalBuffered > 0);
+ long amountInChunk = sink.totalBuffered;
+ // Get a chunk
+ RegionEntryBuffer chunk = sink.getChunkToWrite();
+ assertEquals(chunk.heapSize(), amountInChunk);
+
+ // Make sure it got marked that a thread is "working on this"
+ assertTrue(sink.isRegionCurrentlyWriting(TEST_REGION));
+
+ // Insert some more entries
+ for (int i = 0; i < 500; i++) {
+ HLog.Entry entry = createTestLogEntry(i);
+ sink.appendEntry(entry);
+ }
+ // Asking for another chunk shouldn't work since the first one
+ // is still writing
+ assertNull(sink.getChunkToWrite());
+
+ // If we say we're done writing the first chunk, then we should be able
+ // to get the second
+ sink.doneWriting(chunk);
+
+ RegionEntryBuffer chunk2 = sink.getChunkToWrite();
+ assertNotNull(chunk2);
+ assertNotSame(chunk, chunk2);
+ long amountInChunk2 = sink.totalBuffered;
+ // The second chunk had fewer rows than the first
+ assertTrue(amountInChunk2 < amountInChunk);
+
+ sink.doneWriting(chunk2);
+ assertEquals(0, sink.totalBuffered);
+ }
+
+ private HLog.Entry createTestLogEntry(int i) {
+ long seq = i;
+ long now = i * 1000;
+
+ WALEdit edit = new WALEdit();
+ edit.add(KeyValueTestUtil.create("row", "fam", "qual", 1234, "val"));
+ HLogKey key = new HLogKey(TEST_REGION, TEST_TABLE, seq, now);
+ HLog.Entry entry = new HLog.Entry(key, edit);
+ return entry;
+ }
+}
Modified: hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java?rev=1051279&r1=1051278&r2=1051279&view=diff
==============================================================================
--- hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java (original)
+++ hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java Mon Dec 20 20:36:17 2010
@@ -19,16 +19,14 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -39,7 +37,9 @@ import org.apache.hadoop.fs.FSDataInputS
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
@@ -52,9 +52,16 @@ import org.apache.hadoop.hbase.util.Thre
import org.apache.hadoop.ipc.RemoteException;
import org.junit.After;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
/**
* Testing {@link HLog} splitting code.
@@ -119,11 +126,15 @@ public class TestHLogSplit {
@Before
public void setUp() throws Exception {
+ flushToConsole("Cleaning up cluster for new test\n"
+ + "--------------------------");
conf = TEST_UTIL.getConfiguration();
fs = TEST_UTIL.getDFSCluster().getFileSystem();
FileStatus[] entries = fs.listStatus(new Path("/"));
+ flushToConsole("Num entries in /:" + entries.length);
for (FileStatus dir : entries){
- fs.delete(dir.getPath(), true);
+ assertTrue("Deleting " + dir.getPath(),
+ fs.delete(dir.getPath(), true));
}
seq = 0;
regions = new ArrayList<String>();
@@ -161,18 +172,23 @@ public class TestHLogSplit {
public void testSplitFailsIfNewHLogGetsCreatedAfterSplitStarted()
throws IOException {
AtomicBoolean stop = new AtomicBoolean(false);
+
+ FileStatus[] stats = fs.listStatus(new Path("/hbase/t1"));
+ assertTrue("Previous test should clean up table dir",
+ stats == null || stats.length == 0);
+
generateHLogs(-1);
- fs.initialize(fs.getUri(), conf);
+
try {
(new ZombieNewLogWriterRegionServer(stop)).start();
- HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
- logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
+ hbaseDir, hlogDir, oldLogDir, fs);
+ logSplitter.splitLog();
} finally {
stop.set(true);
}
}
-
@Test
public void testSplitPreservesEdits() throws IOException{
final String REGION = "region__1";
@@ -181,8 +197,9 @@ public class TestHLogSplit {
generateHLogs(1, 10, -1);
fs.initialize(fs.getUri(), conf);
- HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
- logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
+ hbaseDir, hlogDir, oldLogDir, fs);
+ logSplitter.splitLog();
Path originalLog = (fs.listStatus(oldLogDir))[0].getPath();
Path splitLog = getLogForRegion(hbaseDir, TABLE_NAME, REGION);
@@ -202,8 +219,9 @@ public class TestHLogSplit {
// initialize will create a new DFSClient with a new client ID
fs.initialize(fs.getUri(), conf);
- HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
- logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
+ hbaseDir, hlogDir, oldLogDir, fs);
+ logSplitter.splitLog();
for (String region : regions) {
@@ -224,8 +242,9 @@ public class TestHLogSplit {
// initialize will create a new DFSClient with a new client ID
fs.initialize(fs.getUri(), conf);
- HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
- logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
+ hbaseDir, hlogDir, oldLogDir, fs);
+ logSplitter.splitLog();
for (String region : regions) {
Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
@@ -240,8 +259,9 @@ public class TestHLogSplit {
fs.initialize(fs.getUri(), conf);
- HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
- logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
+ hbaseDir, hlogDir, oldLogDir, fs);
+ logSplitter.splitLog();
for (String region : regions) {
Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
@@ -260,8 +280,9 @@ public class TestHLogSplit {
Corruptions.APPEND_GARBAGE, true, fs);
fs.initialize(fs.getUri(), conf);
- HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
- logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
+ hbaseDir, hlogDir, oldLogDir, fs);
+ logSplitter.splitLog();
for (String region : regions) {
Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf));
@@ -278,8 +299,9 @@ public class TestHLogSplit {
Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs);
fs.initialize(fs.getUri(), conf);
- HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
- logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
+ hbaseDir, hlogDir, oldLogDir, fs);
+ logSplitter.splitLog();
for (String region : regions) {
Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
assertEquals((NUM_WRITERS - 1) * ENTRIES, countHLog(logfile, fs, conf));
@@ -296,8 +318,9 @@ public class TestHLogSplit {
corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"),
Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false, fs);
fs.initialize(fs.getUri(), conf);
- HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
- logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
+ hbaseDir, hlogDir, oldLogDir, fs);
+ logSplitter.splitLog();
for (String region : regions) {
Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
@@ -323,13 +346,13 @@ public class TestHLogSplit {
Path c1 = new Path(hlogDir, HLOG_FILE_PREFIX + "0");
conf.setClass("hbase.regionserver.hlog.reader.impl",
FaultySequenceFileLogReader.class, HLog.Reader.class);
- String[] failureTypes = { "begin", "middle", "end" };
for (FaultySequenceFileLogReader.FailureType failureType : FaultySequenceFileLogReader.FailureType.values()) {
conf.set("faultysequencefilelogreader.failuretype", failureType.name());
generateHLogs(1, ENTRIES, -1);
fs.initialize(fs.getUri(), conf);
- HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
- logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
+ hbaseDir, hlogDir, oldLogDir, fs);
+ logSplitter.splitLog();
FileStatus[] archivedLogs = fs.listStatus(corruptDir);
assertEquals("expected a different file", c1.getName(), archivedLogs[0]
.getPath().getName());
@@ -358,8 +381,9 @@ public class TestHLogSplit {
conf.set("faultysequencefilelogreader.failuretype", FaultySequenceFileLogReader.FailureType.BEGINNING.name());
generateHLogs(Integer.MAX_VALUE);
fs.initialize(fs.getUri(), conf);
- HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
- logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
+ hbaseDir, hlogDir, oldLogDir, fs);
+ logSplitter.splitLog();
} finally {
conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass,
Reader.class);
@@ -383,9 +407,10 @@ public class TestHLogSplit {
conf.set("faultysequencefilelogreader.failuretype", FaultySequenceFileLogReader.FailureType.BEGINNING.name());
generateHLogs(-1);
fs.initialize(fs.getUri(), conf);
- HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
+ hbaseDir, hlogDir, oldLogDir, fs);
try {
- logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+ logSplitter.splitLog();
} catch (IOException e) {
assertEquals(
"if skip.errors is false all files should remain in place",
@@ -413,8 +438,9 @@ public class TestHLogSplit {
corruptHLog(c1, Corruptions.TRUNCATE, true, fs);
fs.initialize(fs.getUri(), conf);
- HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
- logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
+ hbaseDir, hlogDir, oldLogDir, fs);
+ logSplitter.splitLog();
Path originalLog = (fs.listStatus(oldLogDir))[0].getPath();
Path splitLog = getLogForRegion(hbaseDir, TABLE_NAME, REGION);
@@ -437,8 +463,9 @@ public class TestHLogSplit {
generateHLogs(-1);
fs.initialize(fs.getUri(), conf);
- HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
- logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
+ hbaseDir, hlogDir, oldLogDir, fs);
+ logSplitter.splitLog();
FileStatus[] archivedLogs = fs.listStatus(oldLogDir);
@@ -449,8 +476,9 @@ public class TestHLogSplit {
public void testSplit() throws IOException {
generateHLogs(-1);
fs.initialize(fs.getUri(), conf);
- HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
- logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
+ hbaseDir, hlogDir, oldLogDir, fs);
+ logSplitter.splitLog();
for (String region : regions) {
Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
@@ -464,12 +492,16 @@ public class TestHLogSplit {
throws IOException {
generateHLogs(-1);
fs.initialize(fs.getUri(), conf);
- HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
- logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
+ hbaseDir, hlogDir, oldLogDir, fs);
+ logSplitter.splitLog();
FileStatus [] statuses = null;
try {
statuses = fs.listStatus(hlogDir);
- assertNull(statuses);
+ if (statuses != null) {
+ Assert.fail("Files left in log dir: " +
+ Joiner.on(",").join(FileUtil.stat2Paths(statuses)));
+ }
} catch (FileNotFoundException e) {
// hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null
}
@@ -516,8 +548,9 @@ public class TestHLogSplit {
try {
zombie.start();
try {
- HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
- logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
+ hbaseDir, hlogDir, oldLogDir, fs);
+ logSplitter.splitLog();
} catch (IOException ex) {/* expected */}
int logFilesNumber = fs.listStatus(hlogDir).length;
@@ -549,11 +582,12 @@ public class TestHLogSplit {
try {
InstrumentedSequenceFileLogWriter.activateFailure = true;
- HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
- logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
+ hbaseDir, hlogDir, oldLogDir, fs);
+ logSplitter.splitLog();
} catch (IOException e) {
- assertEquals("java.io.IOException: This exception is instrumented and should only be thrown for testing", e.getMessage());
+ assertEquals("This exception is instrumented and should only be thrown for testing", e.getMessage());
throw e;
} finally {
InstrumentedSequenceFileLogWriter.activateFailure = false;
@@ -561,7 +595,10 @@ public class TestHLogSplit {
}
-// @Test
+ // @Test TODO this test has been disabled since it was created!
+ // It currently fails because the second split doesn't output anything
+ // -- because there are no region dirs after we move aside the first
+ // split result
public void testSplittingLargeNumberOfRegionsConsistency() throws IOException {
regions.removeAll(regions);
@@ -572,8 +609,9 @@ public class TestHLogSplit {
generateHLogs(1, 100, -1);
fs.initialize(fs.getUri(), conf);
- HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
- logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
+ hbaseDir, hlogDir, oldLogDir, fs);
+ logSplitter.splitLog();
fs.rename(oldLogDir, hlogDir);
Path firstSplitPath = new Path(hbaseDir, Bytes.toString(TABLE_NAME) + ".first");
Path splitPath = new Path(hbaseDir, Bytes.toString(TABLE_NAME));
@@ -582,7 +620,9 @@ public class TestHLogSplit {
fs.initialize(fs.getUri(), conf);
- logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+ logSplitter = HLogSplitter.createLogSplitter(conf,
+ hbaseDir, hlogDir, oldLogDir, fs);
+ logSplitter.splitLog();
assertEquals(0, compareHLogSplitDirs(firstSplitPath, splitPath));
}
@@ -600,11 +640,161 @@ public class TestHLogSplit {
Path regiondir = new Path(tabledir, region);
fs.delete(regiondir, true);
- HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
- logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
+ hbaseDir, hlogDir, oldLogDir, fs);
+ logSplitter.splitLog();
assertFalse(fs.exists(regiondir));
}
+
+ @Test
+ public void testIOEOnOutputThread() throws Exception {
+ conf.setBoolean(HBASE_SKIP_ERRORS, false);
+
+ generateHLogs(-1);
+
+ fs.initialize(fs.getUri(), conf);
+ // Set up a splitter that will throw an IOE on the output side
+ HLogSplitter logSplitter = new HLogSplitter(
+ conf, hbaseDir, hlogDir, oldLogDir, fs) {
+ protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
+ throws IOException {
+ HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class);
+ Mockito.doThrow(new IOException("Injected")).when(mockWriter).append(Mockito.<HLog.Entry>any());
+ return mockWriter;
+
+ }
+ };
+ try {
+ logSplitter.splitLog();
+ fail("Didn't throw!");
+ } catch (IOException ioe) {
+ assertTrue(ioe.toString().contains("Injected"));
+ }
+ }
+
+ /**
+ * Test log split process with fake data and lots of edits to trigger threading
+ * issues.
+ */
+ @Test
+ public void testThreading() throws Exception {
+ doTestThreading(20000, 128*1024*1024, 0);
+ }
+
+ /**
+ * Test blocking behavior of the log split process if writers are writing slower
+ * than the reader is reading.
+ */
+ @Test
+ public void testThreadingSlowWriterSmallBuffer() throws Exception {
+ doTestThreading(200, 1024, 50);
+ }
+
+ /**
+ * Sets up a log splitter with a mock reader and writer. The mock reader generates
+ * a specified number of edits spread across 5 regions. The mock writer optionally
+ * sleeps for each edit it is fed.
+ * *
+ * After the split is complete, verifies that the statistics show the correct number
+ * of edits output into each region.
+ *
+ * @param numFakeEdits number of fake edits to push through pipeline
+ * @param bufferSize size of in-memory buffer
+ * @param writerSlowness writer threads will sleep this many ms per edit
+ */
+ private void doTestThreading(final int numFakeEdits,
+ final int bufferSize,
+ final int writerSlowness) throws Exception {
+
+ Configuration localConf = new Configuration(conf);
+ localConf.setInt("hbase.regionserver.hlog.splitlog.buffersize", bufferSize);
+
+ // Create a fake log file (we'll override the reader to produce a stream of edits)
+ FSDataOutputStream out = fs.create(new Path(hlogDir, HLOG_FILE_PREFIX + ".fake"));
+ out.close();
+
+ // Make region dirs for our destination regions so the output doesn't get skipped
+ final List<String> regions = ImmutableList.of("r0", "r1", "r2", "r3", "r4");
+ makeRegionDirs(fs, regions);
+
+ // Create a splitter that reads and writes the data without touching disk
+ HLogSplitter logSplitter = new HLogSplitter(
+ localConf, hbaseDir, hlogDir, oldLogDir, fs) {
+
+ /* Produce a mock writer that doesn't write anywhere */
+ protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
+ throws IOException {
+ HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class);
+ Mockito.doAnswer(new Answer<Void>() {
+ int expectedIndex = 0;
+
+ @Override
+ public Void answer(InvocationOnMock invocation) {
+ if (writerSlowness > 0) {
+ try {
+ Thread.sleep(writerSlowness);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ HLog.Entry entry = (Entry) invocation.getArguments()[0];
+ WALEdit edit = entry.getEdit();
+ List<KeyValue> keyValues = edit.getKeyValues();
+ assertEquals(1, keyValues.size());
+ KeyValue kv = keyValues.get(0);
+
+ // Check that the edits come in the right order.
+ assertEquals(expectedIndex, Bytes.toInt(kv.getRow()));
+ expectedIndex++;
+ return null;
+ }
+ }).when(mockWriter).append(Mockito.<HLog.Entry>any());
+ return mockWriter;
+ }
+
+
+ /* Produce a mock reader that generates fake entries */
+ protected Reader getReader(FileSystem fs, Path curLogFile, Configuration conf)
+ throws IOException {
+ Reader mockReader = Mockito.mock(Reader.class);
+ Mockito.doAnswer(new Answer<HLog.Entry>() {
+ int index = 0;
+
+ @Override
+ public HLog.Entry answer(InvocationOnMock invocation) throws Throwable {
+ if (index >= numFakeEdits) return null;
+
+ // Generate r0 through r4 in round robin fashion
+ int regionIdx = index % regions.size();
+ byte region[] = new byte[] {(byte)'r', (byte) (0x30 + regionIdx)};
+
+ HLog.Entry ret = createTestEntry(TABLE_NAME, region,
+ Bytes.toBytes((int)(index / regions.size())),
+ FAMILY, QUALIFIER, VALUE, index);
+ index++;
+ return ret;
+ }
+ }).when(mockReader).next();
+ return mockReader;
+ }
+ };
+
+ logSplitter.splitLog();
+
+ // Verify number of written edits per region
+
+ Map<byte[], Long> outputCounts = logSplitter.getOutputCounts();
+ for (Map.Entry<byte[], Long> entry : outputCounts.entrySet()) {
+ LOG.info("Got " + entry.getValue() + " output edits for region " +
+ Bytes.toString(entry.getKey()));
+
+ assertEquals((long)entry.getValue(), numFakeEdits / regions.size());
+ }
+ assertEquals(regions.size(), outputCounts.size());
+ }
+
+
/**
* This thread will keep writing to the file after the split process has started
@@ -677,29 +867,19 @@ public class TestHLogSplit {
if (stop.get()) {
return;
}
- boolean splitStarted = false;
- Path p = new Path(hbaseDir, new String(TABLE_NAME));
- while (!splitStarted) {
- try {
- FileStatus [] statuses = fs.listStatus(p);
- // In 0.20, listStatus comes back with a null if file doesn't exit.
- // In 0.21, it throws FNFE.
- if (statuses != null && statuses.length > 0) {
- // Done.
- break;
- }
- } catch (FileNotFoundException e) {
- // Expected in hadoop 0.21
- } catch (IOException e1) {
- assertTrue("Failed to list status ", false);
- }
- flushToConsole("Juliet: split not started, sleeping a bit...");
- Threads.sleep(100);
- }
+ Path tableDir = new Path(hbaseDir, new String(TABLE_NAME));
+ Path regionDir = new Path(tableDir, regions.get(0));
+ Path recoveredEdits = new Path(regionDir, HLogSplitter.RECOVERED_EDITS);
String region = "juliet";
Path julietLog = new Path(hlogDir, HLOG_FILE_PREFIX + ".juliet");
try {
- fs.mkdirs(new Path(new Path(hbaseDir, region), region));
+
+ while (!fs.exists(recoveredEdits) && !stop.get()) {
+ flushToConsole("Juliet: split not started, sleeping a bit...");
+ Threads.sleep(10);
+ }
+
+ fs.mkdirs(new Path(tableDir, region));
HLog.Writer writer = HLog.createWriter(fs,
julietLog, conf);
appendEntry(writer, "juliet".getBytes(), ("juliet").getBytes(),
@@ -722,10 +902,15 @@ public class TestHLogSplit {
generateHLogs(NUM_WRITERS, ENTRIES, leaveOpen);
}
- private void generateHLogs(int writers, int entries, int leaveOpen) throws IOException {
+ private void makeRegionDirs(FileSystem fs, List<String> regions) throws IOException {
for (String region : regions) {
+ flushToConsole("Creating dir for region " + region);
fs.mkdirs(new Path(tabledir, region));
}
+ }
+
+ private void generateHLogs(int writers, int entries, int leaveOpen) throws IOException {
+ makeRegionDirs(fs, regions);
for (int i = 0; i < writers; i++) {
writer[i] = HLog.createWriter(fs, new Path(hlogDir, HLOG_FILE_PREFIX + i), conf);
for (int j = 0; j < entries; j++) {
@@ -835,14 +1020,20 @@ public class TestHLogSplit {
byte[] value, long seq)
throws IOException {
+ writer.append(createTestEntry(table, region, row, family, qualifier, value, seq));
+ writer.sync();
+ return seq;
+ }
+
+ private HLog.Entry createTestEntry(
+ byte[] table, byte[] region,
+ byte[] row, byte[] family, byte[] qualifier,
+ byte[] value, long seq) {
long time = System.nanoTime();
WALEdit edit = new WALEdit();
seq++;
edit.add(new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value));
- writer.append(new HLog.Entry(new HLogKey(region, table, seq, time), edit));
- writer.sync();
- return seq;
-
+ return new HLog.Entry(new HLogKey(region, table, seq, time), edit);
}
@@ -864,6 +1055,14 @@ public class TestHLogSplit {
private int compareHLogSplitDirs(Path p1, Path p2) throws IOException {
FileStatus[] f1 = fs.listStatus(p1);
FileStatus[] f2 = fs.listStatus(p2);
+ assertNotNull("Path " + p1 + " doesn't exist", f1);
+ assertNotNull("Path " + p2 + " doesn't exist", f2);
+
+ System.out.println("Files in " + p1 + ": " +
+ Joiner.on(",").join(FileUtil.stat2Paths(f1)));
+ System.out.println("Files in " + p2 + ": " +
+ Joiner.on(",").join(FileUtil.stat2Paths(f2)));
+ assertEquals(f1.length, f2.length);
for (int i = 0; i < f1.length; i++) {
// Regions now have a directory named RECOVERED_EDITS_DIR and in here
Modified: hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java?rev=1051279&r1=1051278&r2=1051279&view=diff
==============================================================================
--- hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (original)
+++ hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java Mon Dec 20 20:36:17 2010
@@ -487,9 +487,9 @@ public class TestWALReplay {
*/
private Path runWALSplit(final Configuration c) throws IOException {
FileSystem fs = FileSystem.get(c);
- HLogSplitter logSplitter = HLogSplitter.createLogSplitter(c);
- List<Path> splits = logSplitter.splitLog(this.hbaseRootDir, this.logDir,
- this.oldLogDir, fs, c);
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(c,
+ this.hbaseRootDir, this.logDir, this.oldLogDir, fs);
+ List<Path> splits = logSplitter.splitLog();
// Split should generate only 1 file since there's only 1 region
assertEquals(1, splits.size());
// Make sure the file exists