You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2011/04/18 19:16:15 UTC
svn commit: r1094662 [2/3] - in /hbase/trunk: ./
src/main/java/org/apache/hadoop/hbase/master/
src/main/java/org/apache/hadoop/hbase/regionserver/
src/main/java/org/apache/hadoop/hbase/regionserver/wal/
src/main/java/org/apache/hadoop/hbase/zookeeper/ ...
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1094662&r1=1094661&r2=1094662&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java Mon Apr 18 17:16:15 2011
@@ -24,6 +24,7 @@ import static org.apache.hadoop.hbase.ut
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.text.ParseException;
@@ -44,6 +45,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.master.SplitLogManager.TaskFinisher.Status;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
@@ -52,8 +54,12 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.io.MultipleIOException;
+import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -73,7 +79,7 @@ public class HLogSplitter {
*/
public static final String RECOVERED_EDITS = "recovered.edits";
-
+
static final Log LOG = LogFactory.getLog(HLogSplitter.class);
private boolean hasSplit = false;
@@ -87,7 +93,7 @@ public class HLogSplitter {
protected final Path oldLogDir;
protected final FileSystem fs;
protected final Configuration conf;
-
+
// Major subcomponents of the split process.
// These are separated into inner classes to make testing easier.
OutputSink outputSink;
@@ -101,17 +107,18 @@ public class HLogSplitter {
// consumed by the reader thread, or an exception occurred
Object dataAvailable = new Object();
-
+
/**
* Create a new HLogSplitter using the given {@link Configuration} and the
* <code>hbase.hlog.splitter.impl</code> property to derived the instance
* class to use.
- *
+ * <p>
* @param conf
* @param rootDir hbase directory
* @param srcDir logs directory
* @param oldLogDir directory where processed logs are archived to
* @param fs FileSystem
+ * @return New HLogSplitter instance
*/
public static HLogSplitter createLogSplitter(Configuration conf,
final Path rootDir, final Path srcDir,
@@ -151,18 +158,18 @@ public class HLogSplitter {
this.srcDir = srcDir;
this.oldLogDir = oldLogDir;
this.fs = fs;
-
+
entryBuffers = new EntryBuffers(
conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
128*1024*1024));
outputSink = new OutputSink();
}
-
+
/**
* Split up a bunch of regionserver commit log files that are no longer being
* written to, into new files, one per region for region to replay on startup.
* Delete the old log files when finished.
- *
+ *
* @throws IOException will throw if corrupted hlogs aren't tolerated
* @return the list of splits
*/
@@ -172,7 +179,7 @@ public class HLogSplitter {
"An HLogSplitter instance may only be used once");
hasSplit = true;
- long startTime = System.currentTimeMillis();
+ long startTime = EnvironmentEdgeManager.currentTimeMillis();
List<Path> splits = null;
if (!fs.exists(srcDir)) {
// Nothing to do
@@ -186,20 +193,20 @@ public class HLogSplitter {
LOG.info("Splitting " + logfiles.length + " hlog(s) in "
+ srcDir.toString());
splits = splitLog(logfiles);
-
- splitTime = System.currentTimeMillis() - startTime;
+
+ splitTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
LOG.info("hlog file splitting completed in " + splitTime +
" ms for " + srcDir.toString());
return splits;
}
-
+
/**
* @return time that this split took
*/
public long getTime() {
return this.splitTime;
}
-
+
/**
* @return aggregate size of hlogs that were split
*/
@@ -215,12 +222,12 @@ public class HLogSplitter {
Preconditions.checkState(hasSplit);
return outputSink.getOutputCounts();
}
-
+
/**
* Splits the HLog edits in the given list of logfiles (that are a mix of edits
* on multiple regions) by region and then splits them per region directories,
* in batches of (hbase.hlog.split.batch.size)
- *
+ * <p>
* This process is split into multiple threads. In the main thread, we loop
* through the logs to be split. For each log, we:
* <ul>
@@ -228,13 +235,13 @@ public class HLogSplitter {
* <li> Read each edit (see {@link #parseHLog}</li>
* <li> Mark as "processed" or "corrupt" depending on outcome</li>
* </ul>
- *
+ * <p>
* Each edit is passed into the EntryBuffers instance, which takes care of
* memory accounting and splitting the edits by region.
- *
+ * <p>
* The OutputSink object then manages N other WriterThreads which pull chunks
* of edits from EntryBuffers and write them to the output region directories.
- *
+ * <p>
* After the process is complete, the log files are archived to a separate
* directory.
*/
@@ -248,7 +255,7 @@ public class HLogSplitter {
splitSize = 0;
outputSink.startWriterThreads(entryBuffers);
-
+
try {
int i = 0;
for (FileStatus log : logfiles) {
@@ -257,36 +264,24 @@ public class HLogSplitter {
splitSize += logLength;
LOG.debug("Splitting hlog " + (i++ + 1) + " of " + logfiles.length
+ ": " + logPath + ", length=" + logLength);
+ Reader in;
try {
- recoverFileLease(fs, logPath, conf);
- parseHLog(log, entryBuffers, fs, conf);
- processedLogs.add(logPath);
- } catch (EOFException eof) {
- // truncated files are expected if a RS crashes (see HBASE-2643)
- LOG.info("EOF from hlog " + logPath + ". Continuing");
- processedLogs.add(logPath);
- } catch (FileNotFoundException fnfe) {
- // A file may be missing if the region server was able to archive it
- // before shutting down. This means the edits were persisted already
- LOG.info("A log was missing " + logPath +
- ", probably because it was moved by the" +
- " now dead region server. Continuing");
- processedLogs.add(logPath);
- } catch (IOException e) {
- // If the IOE resulted from bad file format,
- // then this problem is idempotent and retrying won't help
- if (e.getCause() instanceof ParseException) {
- LOG.warn("Parse exception from hlog " + logPath + ". continuing", e);
- processedLogs.add(logPath);
- } else {
- if (skipErrors) {
- LOG.info("Got while parsing hlog " + logPath +
- ". Marking as corrupted", e);
- corruptedLogs.add(logPath);
- } else {
- throw e;
+ in = getReader(fs, log, conf, skipErrors);
+ if (in != null) {
+ parseHLog(in, logPath, entryBuffers, fs, conf, skipErrors);
+ try {
+ in.close();
+ } catch (IOException e) {
+ LOG.warn("Close log reader threw exception -- continuing",
+ e);
}
}
+ processedLogs.add(logPath);
+ } catch (CorruptedLogFileException e) {
+ LOG.info("Got while parsing hlog " + logPath +
+ ". Marking as corrupted", e);
+ corruptedLogs.add(logPath);
+ continue;
}
}
if (fs.listStatus(srcDir).length > processedLogs.size()
@@ -295,7 +290,7 @@ public class HLogSplitter {
"Discovered orphan hlog after split. Maybe the "
+ "HRegionServer was not dead when we started");
}
- archiveLogs(srcDir, corruptedLogs, processedLogs, oldLogDir, fs, conf);
+ archiveLogs(srcDir, corruptedLogs, processedLogs, oldLogDir, fs, conf);
} finally {
splits = outputSink.finishWritingAndClose();
}
@@ -303,10 +298,214 @@ public class HLogSplitter {
}
/**
+ * Splits a HLog file into a temporary staging area. tmpname is used to build
+ * the name of the staging area where the recovered-edits will be separated
+ * out by region and stored.
+ * <p>
+ * If the log file has N regions then N recovered.edits files will be
+ * produced. There is no buffering in this code. Instead it relies on the
+ * buffering in the SequenceFileWriter.
+ * <p>
+ * @param rootDir
+ * @param tmpname
+ * @param logfile
+ * @param fs
+ * @param conf
+ * @param reporter
+ * @return false if it is interrupted by the progress-able.
+ * @throws IOException
+ */
+ static public boolean splitLogFileToTemp(Path rootDir, String tmpname,
+ FileStatus logfile, FileSystem fs,
+ Configuration conf, CancelableProgressable reporter) throws IOException {
+ HLogSplitter s = new HLogSplitter(conf, rootDir, null, null /* oldLogDir */,
+ fs);
+ return s.splitLogFileToTemp(logfile, tmpname, reporter);
+ }
+
+ public boolean splitLogFileToTemp(FileStatus logfile, String tmpname,
+ CancelableProgressable reporter) throws IOException {
+ final Map<byte[], Object> logWriters = Collections.
+ synchronizedMap(new TreeMap<byte[], Object>(Bytes.BYTES_COMPARATOR));
+ boolean isCorrupted = false;
+
+ Object BAD_WRITER = new Object();
+
+ boolean progress_failed = false;
+
+ boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", false);
+ int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
+ // How often to send a progress report (default 1/2 master timeout)
+ int period = conf.getInt("hbase.splitlog.report.period",
+ conf.getInt("hbase.splitlog.manager.timeout",
+ ZKSplitLog.DEFAULT_TIMEOUT) / 2);
+ Path logPath = logfile.getPath();
+ long logLength = logfile.getLen();
+ LOG.info("Splitting hlog: " + logPath + ", length=" + logLength);
+ Reader in = null;
+ try {
+ in = getReader(fs, logfile, conf, skipErrors);
+ } catch (CorruptedLogFileException e) {
+ LOG.warn("Could not get reader, corrupted log file " + logPath, e);
+ ZKSplitLog.markCorrupted(rootDir, tmpname, fs);
+ isCorrupted = true;
+ }
+ if (in == null) {
+ LOG.warn("Nothing to split in log file " + logPath);
+ return true;
+ }
+ long t = EnvironmentEdgeManager.currentTimeMillis();
+ long last_report_at = t;
+ if (reporter != null && reporter.progress() == false) {
+ return false;
+ }
+ int editsCount = 0;
+ Entry entry;
+ try {
+ while ((entry = getNextLogLine(in,logPath, skipErrors)) != null) {
+ byte[] region = entry.getKey().getEncodedRegionName();
+ Object o = logWriters.get(region);
+ if (o == BAD_WRITER) {
+ continue;
+ }
+ WriterAndPath wap = (WriterAndPath)o;
+ if (wap == null) {
+ wap = createWAP(region, entry, rootDir, tmpname, fs, conf);
+ if (wap == null) {
+ logWriters.put(region, BAD_WRITER);
+ } else {
+ logWriters.put(region, wap);
+ }
+ }
+ wap.w.append(entry);
+ editsCount++;
+ if (editsCount % interval == 0) {
+ long t1 = EnvironmentEdgeManager.currentTimeMillis();
+ if ((t1 - last_report_at) > period) {
+ last_report_at = t;
+ if (reporter != null && reporter.progress() == false) {
+ progress_failed = true;
+ return false;
+ }
+ }
+ }
+ }
+ } catch (CorruptedLogFileException e) {
+ LOG.warn("Could not parse, corrupted log file " + logPath, e);
+ ZKSplitLog.markCorrupted(rootDir, tmpname, fs);
+ isCorrupted = true;
+ } catch (IOException e) {
+ e = RemoteExceptionHandler.checkIOException(e);
+ throw e;
+ } finally {
+ int n = 0;
+ for (Object o : logWriters.values()) {
+ long t1 = EnvironmentEdgeManager.currentTimeMillis();
+ if ((t1 - last_report_at) > period) {
+ last_report_at = t;
+ if ((progress_failed == false) && (reporter != null) &&
+ (reporter.progress() == false)) {
+ progress_failed = true;
+ }
+ }
+ if (o == BAD_WRITER) {
+ continue;
+ }
+ n++;
+ WriterAndPath wap = (WriterAndPath)o;
+ wap.w.close();
+ LOG.debug("Closed " + wap.p);
+ }
+ LOG.info("processed " + editsCount + " edits across " + n + " regions" +
+ " threw away edits for " + (logWriters.size() - n) + " regions" +
+ " log file = " + logPath +
+ " is corrupted = " + isCorrupted);
+ }
+ return true;
+ }
+
+ /**
+ * Completes the work done by splitLogFileToTemp by moving the
+ * recovered.edits from the staging area to the respective region server's
+ * directories.
+ * <p>
+ * It is invoked by SplitLogManager once it knows that one of the
+ * SplitLogWorkers have completed the splitLogFileToTemp() part. If the
+ * master crashes then this function might get called multiple times.
+ * <p>
+ * @param tmpname
+ * @param conf
+ * @throws IOException
+ */
+ public static void moveRecoveredEditsFromTemp(String tmpname,
+ String logfile, Configuration conf)
+ throws IOException{
+ Path rootdir = FSUtils.getRootDir(conf);
+ Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
+ moveRecoveredEditsFromTemp(tmpname, rootdir, oldLogDir, logfile, conf);
+ }
+
+ public static void moveRecoveredEditsFromTemp(String tmpname,
+ Path rootdir, Path oldLogDir,
+ String logfile, Configuration conf)
+ throws IOException{
+ List<Path> processedLogs = new ArrayList<Path>();
+ List<Path> corruptedLogs = new ArrayList<Path>();
+ FileSystem fs;
+ fs = rootdir.getFileSystem(conf);
+ Path logPath = new Path(logfile);
+ if (ZKSplitLog.isCorrupted(rootdir, tmpname, fs)) {
+ corruptedLogs.add(logPath);
+ } else {
+ processedLogs.add(logPath);
+ }
+ Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, tmpname);
+ List<FileStatus> files = listAll(fs, stagingDir);
+ for (FileStatus f : files) {
+ Path src = f.getPath();
+ Path dst = ZKSplitLog.stripSplitLogTempDir(rootdir, src);
+ if (ZKSplitLog.isCorruptFlagFile(dst)) {
+ continue;
+ }
+ if (fs.exists(dst)) {
+ fs.delete(dst, false);
+ } else {
+ Path dstdir = dst.getParent();
+ if (!fs.exists(dstdir)) {
+ if (!fs.mkdirs(dstdir)) LOG.warn("mkdir failed on " + dstdir);
+ }
+ }
+ fs.rename(src, dst);
+ LOG.debug(" moved " + src + " => " + dst);
+ }
+ archiveLogs(null, corruptedLogs, processedLogs,
+ oldLogDir, fs, conf);
+ fs.delete(stagingDir, true);
+ return;
+ }
+
+ private static List<FileStatus> listAll(FileSystem fs, Path dir)
+ throws IOException {
+ List<FileStatus> fset = new ArrayList<FileStatus>(100);
+ FileStatus [] files = fs.listStatus(dir);
+ if (files != null) {
+ for (FileStatus f : files) {
+ if (f.isDir()) {
+ fset.addAll(listAll(fs, f.getPath()));
+ } else {
+ fset.add(f);
+ }
+ }
+ }
+ return fset;
+ }
+
+
+ /**
* Moves processed logs to a oldLogDir after successful processing Moves
* corrupted logs (any log that couldn't be successfully parsed to corruptDir
* (.corrupt) for later investigation
- *
+ *
* @param corruptedLogs
* @param processedLogs
* @param oldLogDir
@@ -329,7 +528,7 @@ public class HLogSplitter {
for (Path corrupted : corruptedLogs) {
Path p = new Path(corruptDir, corrupted.getName());
- if (!fs.rename(corrupted, p)) {
+ if (!fs.rename(corrupted, p)) {
LOG.info("Unable to move corrupted log " + corrupted + " to " + p);
} else {
LOG.info("Moving corrupted log " + corrupted + " to " + p);
@@ -344,8 +543,8 @@ public class HLogSplitter {
LOG.info("Archived processed log " + p + " to " + newPath);
}
}
-
- if (!fs.delete(srcDir, true)) {
+
+ if (srcDir != null && !fs.delete(srcDir, true)) {
throw new IOException("Unable to delete src dir: " + srcDir);
}
}
@@ -363,19 +562,21 @@ public class HLogSplitter {
* @throws IOException
*/
static Path getRegionSplitEditsPath(final FileSystem fs,
- final Entry logEntry, final Path rootDir) throws IOException {
+ final Entry logEntry, final Path rootDir, boolean isCreate)
+ throws IOException {
Path tableDir = HTableDescriptor.getTableDir(rootDir, logEntry.getKey()
.getTablename());
Path regiondir = HRegion.getRegionDir(tableDir,
Bytes.toString(logEntry.getKey().getEncodedRegionName()));
+ Path dir = HLog.getRegionDirRecoveredEditsDir(regiondir);
+
if (!fs.exists(regiondir)) {
LOG.info("This region's directory doesn't exist: "
+ regiondir.toString() + ". It is very likely that it was" +
" already split so it's safe to discard those edits.");
return null;
}
- Path dir = HLog.getRegionDirRecoveredEditsDir(regiondir);
- if (!fs.exists(dir)) {
+ if (isCreate && !fs.exists(dir)) {
if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
}
return new Path(dir, formatRecoveredEditsFileName(logEntry.getKey()
@@ -385,7 +586,7 @@ public class HLogSplitter {
static String formatRecoveredEditsFileName(final long seqid) {
return String.format("%019d", seqid);
}
-
+
/*
* Parse a single hlog and put the edits in @splitLogsMap
*
@@ -394,61 +595,116 @@ public class HLogSplitter {
* list of edits as values
* @param fs the filesystem
* @param conf the configuration
- * @throws IOException if hlog is corrupted, or can't be open
+ * @throws IOException
+ * @throws CorruptedLogFileException if hlog is corrupted
*/
- private void parseHLog(final FileStatus logfile,
+ private void parseHLog(final Reader in, Path path,
EntryBuffers entryBuffers, final FileSystem fs,
- final Configuration conf)
- throws IOException {
- // Check for possibly empty file. With appends, currently Hadoop reports a
- // zero length even if the file has been sync'd. Revisit if HDFS-376 or
- // HDFS-878 is committed.
- long length = logfile.getLen();
- if (length <= 0) {
- LOG.warn("File " + logfile.getPath() + " might be still open, length is 0");
- }
- Path path = logfile.getPath();
- Reader in;
+ final Configuration conf, boolean skipErrors)
+ throws IOException, CorruptedLogFileException {
int editsCount = 0;
try {
- in = getReader(fs, path, conf);
- } catch (EOFException e) {
- if (length <= 0) {
- //TODO should we ignore an empty, not-last log file if skip.errors is false?
- //Either way, the caller should decide what to do. E.g. ignore if this is the last
- //log in sequence.
- //TODO is this scenario still possible if the log has been recovered (i.e. closed)
- LOG.warn("Could not open " + path + " for reading. File is empty" + e);
- return;
- } else {
- throw e;
- }
- }
- try {
Entry entry;
- while ((entry = in.next()) != null) {
+ while ((entry = getNextLogLine(in, path, skipErrors)) != null) {
entryBuffers.appendEntry(entry);
editsCount++;
}
} catch (InterruptedException ie) {
- throw new RuntimeException(ie);
+ IOException t = new InterruptedIOException();
+ t.initCause(ie);
+ throw t;
} finally {
LOG.debug("Pushed=" + editsCount + " entries from " + path);
+ }
+ }
+
+ /**
+ * Create a new {@link Reader} for reading logs to split.
+ *
+ * @param fs
+ * @param file
+ * @param conf
+ * @return A new Reader instance
+ * @throws IOException
+ * @throws CorruptedLogFile
+ */
+ protected Reader getReader(FileSystem fs, FileStatus file, Configuration conf,
+ boolean skipErrors)
+ throws IOException, CorruptedLogFileException {
+ Path path = file.getPath();
+ long length = file.getLen();
+ Reader in;
+
+
+ // Check for possibly empty file. With appends, currently Hadoop reports a
+ // zero length even if the file has been sync'd. Revisit if HDFS-376 or
+ // HDFS-878 is committed.
+ if (length <= 0) {
+ LOG.warn("File " + path + " might be still open, length is 0");
+ }
+
+ try {
+ recoverFileLease(fs, path, conf);
try {
- if (in != null) {
- in.close();
+ in = getReader(fs, path, conf);
+ } catch (EOFException e) {
+ if (length <= 0) {
+ // TODO should we ignore an empty, not-last log file if skip.errors
+ // is false? Either way, the caller should decide what to do. E.g.
+ // ignore if this is the last log in sequence.
+ // TODO is this scenario still possible if the log has been
+ // recovered (i.e. closed)
+ LOG.warn("Could not open " + path + " for reading. File is empty", e);
+ return null;
+ } else {
+ // EOFException being ignored
+ return null;
}
- } catch (IOException e) {
- LOG.warn("Close log reader in finally threw exception -- continuing",
- e);
}
+ } catch (IOException e) {
+ if (!skipErrors) {
+ throw e;
+ }
+ CorruptedLogFileException t =
+ new CorruptedLogFileException("skipErrors=true Could not open hlog " +
+ path + " ignoring");
+ t.initCause(e);
+ throw t;
+ }
+ return in;
+ }
+
+ static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors)
+ throws CorruptedLogFileException, IOException {
+ try {
+ return in.next();
+ } catch (EOFException eof) {
+ // truncated files are expected if a RS crashes (see HBASE-2643)
+ LOG.info("EOF from hlog " + path + ". continuing");
+ return null;
+ } catch (IOException e) {
+ // If the IOE resulted from bad file format,
+ // then this problem is idempotent and retrying won't help
+ if (e.getCause() instanceof ParseException) {
+ LOG.warn("ParseException from hlog " + path + ". continuing");
+ return null;
+ }
+ if (!skipErrors) {
+ throw e;
+ }
+ CorruptedLogFileException t =
+ new CorruptedLogFileException("skipErrors=true Ignoring exception" +
+ " while parsing hlog " + path + ". Marking as corrupted");
+ t.initCause(e);
+ throw t;
}
}
+
private void writerThreadError(Throwable t) {
thrown.compareAndSet(null, t);
}
-
+
/**
* Check for errors in the writer threads. If any is found, rethrow it.
*/
@@ -477,26 +733,25 @@ public class HLogSplitter {
return HLog.getReader(fs, curLogFile, conf);
}
-
/**
* Class which accumulates edits and separates them into a buffer per region
* while simultaneously accounting RAM usage. Blocks if the RAM usage crosses
* a predefined threshold.
- *
+ *
* Writer threads then pull region-specific buffers from this class.
*/
class EntryBuffers {
Map<byte[], RegionEntryBuffer> buffers =
new TreeMap<byte[], RegionEntryBuffer>(Bytes.BYTES_COMPARATOR);
-
+
/* Track which regions are currently in the middle of writing. We don't allow
an IO thread to pick up bytes from a region if we're already writing
- data for that region in a different IO thread. */
+ data for that region in a different IO thread. */
Set<byte[]> currentlyWriting = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
long totalBuffered = 0;
long maxHeapUsage;
-
+
EntryBuffers(long maxHeapUsage) {
this.maxHeapUsage = maxHeapUsage;
}
@@ -504,13 +759,13 @@ public class HLogSplitter {
/**
* Append a log entry into the corresponding region buffer.
* Blocks if the total heap usage has crossed the specified threshold.
- *
+ *
* @throws InterruptedException
- * @throws IOException
+ * @throws IOException
*/
void appendEntry(Entry entry) throws InterruptedException, IOException {
HLogKey key = entry.getKey();
-
+
RegionEntryBuffer buffer;
synchronized (this) {
buffer = buffers.get(key.getEncodedRegionName());
@@ -566,7 +821,7 @@ public class HLogSplitter {
dataAvailable.notifyAll();
}
}
-
+
synchronized boolean isRegionCurrentlyWriting(byte[] region) {
return currentlyWriting.contains(region);
}
@@ -614,11 +869,11 @@ public class HLogSplitter {
class WriterThread extends Thread {
private volatile boolean shouldStop = false;
-
+
WriterThread(int i) {
super("WriterThread-" + i);
}
-
+
public void run() {
try {
doRun();
@@ -627,7 +882,7 @@ public class HLogSplitter {
writerThreadError(t);
}
}
-
+
private void doRun() throws IOException {
LOG.debug("Writer thread " + this + ": starting");
while (true) {
@@ -646,7 +901,7 @@ public class HLogSplitter {
}
continue;
}
-
+
assert buffer != null;
try {
writeBuffer(buffer);
@@ -655,16 +910,16 @@ public class HLogSplitter {
}
}
}
-
+
private void writeBuffer(RegionEntryBuffer buffer) throws IOException {
- List<Entry> entries = buffer.entryBuffer;
+ List<Entry> entries = buffer.entryBuffer;
if (entries.isEmpty()) {
LOG.warn(this.getName() + " got an empty buffer, skipping");
return;
}
WriterAndPath wap = null;
-
+
long startTime = System.nanoTime();
try {
int editsCount = 0;
@@ -690,12 +945,74 @@ public class HLogSplitter {
throw e;
}
}
-
+
void finish() {
shouldStop = true;
}
}
+ private WriterAndPath createWAP(byte[] region, Entry entry,
+ Path rootdir, String tmpname, FileSystem fs, Configuration conf)
+ throws IOException {
+ Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir,
+ tmpname==null);
+ if (regionedits == null) {
+ return null;
+ }
+ if ((tmpname == null) && fs.exists(regionedits)) {
+ LOG.warn("Found existing old edits file. It could be the "
+ + "result of a previous failed split attempt. Deleting "
+ + regionedits + ", length="
+ + fs.getFileStatus(regionedits).getLen());
+ if (!fs.delete(regionedits, false)) {
+ LOG.warn("Failed delete of old " + regionedits);
+ }
+ }
+ Path editsfile;
+ if (tmpname != null) {
+ // During distributed log splitting the output by each
+ // SplitLogWorker is written to a temporary area.
+ editsfile = convertRegionEditsToTemp(rootdir, regionedits, tmpname);
+ } else {
+ editsfile = regionedits;
+ }
+ Writer w = createWriter(fs, editsfile, conf);
+ LOG.debug("Creating writer path=" + editsfile + " region="
+ + Bytes.toStringBinary(region));
+ return (new WriterAndPath(editsfile, w));
+ }
+
+ Path convertRegionEditsToTemp(Path rootdir, Path edits, String tmpname) {
+ List<String> components = new ArrayList<String>(10);
+ do {
+ components.add(edits.getName());
+ edits = edits.getParent();
+ } while (edits.depth() > rootdir.depth());
+ Path ret = ZKSplitLog.getSplitLogDir(rootdir, tmpname);
+ for (int i = components.size() - 1; i >= 0; i--) {
+ ret = new Path(ret, components.get(i));
+ }
+ try {
+ if (fs.exists(ret)) {
+ LOG.warn("Found existing old temporary edits file. It could be the "
+ + "result of a previous failed split attempt. Deleting "
+ + ret + ", length="
+ + fs.getFileStatus(ret).getLen());
+ if (!fs.delete(ret, false)) {
+ LOG.warn("Failed delete of old " + ret);
+ }
+ }
+ Path dir = ret.getParent();
+ if (!fs.exists(dir)) {
+ if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
+ }
+ } catch (IOException e) {
+ LOG.warn("Could not prepare temp staging area ", e);
+ // ignore, exceptions will be thrown elsewhere
+ }
+ return ret;
+ }
+
/**
* Class that manages the output streams from the log splitting process.
*/
@@ -703,13 +1020,13 @@ public class HLogSplitter {
private final Map<byte[], WriterAndPath> logWriters = Collections.synchronizedMap(
new TreeMap<byte[], WriterAndPath>(Bytes.BYTES_COMPARATOR));
private final List<WriterThread> writerThreads = Lists.newArrayList();
-
+
/* Set of regions which we've decided should not output edits */
private final Set<byte[]> blacklistedRegions = Collections.synchronizedSet(
new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR));
-
- private boolean hasClosed = false;
-
+
+ private boolean hasClosed = false;
+
/**
* Start the threads that will pump data from the entryBuffers
* to the output files.
@@ -730,7 +1047,7 @@ public class HLogSplitter {
writerThreads.add(t);
}
}
-
+
List<Path> finishWritingAndClose() throws IOException {
LOG.info("Waiting for split writer threads to finish");
for (WriterThread t : writerThreads) {
@@ -745,7 +1062,7 @@ public class HLogSplitter {
checkForErrors();
}
LOG.info("Split writers finished");
-
+
return closeStreams();
}
@@ -755,10 +1072,10 @@ public class HLogSplitter {
*/
private List<Path> closeStreams() throws IOException {
Preconditions.checkState(!hasClosed);
-
+
List<Path> paths = new ArrayList<Path>();
List<IOException> thrown = Lists.newArrayList();
-
+
for (WriterAndPath wap : logWriters.values()) {
try {
wap.w.close();
@@ -774,67 +1091,40 @@ public class HLogSplitter {
if (!thrown.isEmpty()) {
throw MultipleIOException.createIOException(thrown);
}
-
+
hasClosed = true;
return paths;
}
/**
* Get a writer and path for a log starting at the given entry.
- *
+ *
* This function is threadsafe so long as multiple threads are always
* acting on different regions.
- *
+ *
* @return null if this region shouldn't output any logs
*/
WriterAndPath getWriterAndPath(Entry entry) throws IOException {
-
byte region[] = entry.getKey().getEncodedRegionName();
WriterAndPath ret = logWriters.get(region);
if (ret != null) {
return ret;
}
-
// If we already decided that this region doesn't get any output
// we don't need to check again.
if (blacklistedRegions.contains(region)) {
return null;
}
-
- // Need to create writer
- Path regionedits = getRegionSplitEditsPath(fs,
- entry, rootDir);
- if (regionedits == null) {
- // Edits dir doesn't exist
+ ret = createWAP(region, entry, rootDir, null, fs, conf);
+ if (ret == null) {
blacklistedRegions.add(region);
return null;
}
- deletePreexistingOldEdits(regionedits);
- Writer w = createWriter(fs, regionedits, conf);
- ret = new WriterAndPath(regionedits, w);
logWriters.put(region, ret);
- LOG.debug("Creating writer path=" + regionedits + " region="
- + Bytes.toStringBinary(region));
-
return ret;
}
/**
- * If the specified path exists, issue a warning and delete it.
- */
- private void deletePreexistingOldEdits(Path regionedits) throws IOException {
- if (fs.exists(regionedits)) {
- LOG.warn("Found existing old edits file. It could be the "
- + "result of a previous failed split attempt. Deleting "
- + regionedits + ", length="
- + fs.getFileStatus(regionedits).getLen());
- if (!fs.delete(regionedits, false)) {
- LOG.warn("Failed delete of old " + regionedits);
- }
- }
- }
-
- /**
* @return a map from encoded region ID to the number of edits written out
* for that region.
*/
@@ -850,6 +1140,8 @@ public class HLogSplitter {
}
}
+
+
/**
* Private data structure that wraps a Writer and its Path,
* also collecting statistics about the data written to this
@@ -877,4 +1169,11 @@ public class HLogSplitter {
nanosSpent += nanos;
}
}
+
+ static class CorruptedLogFileException extends Exception {
+ private static final long serialVersionUID = 1L;
+ CorruptedLogFileException(String s) {
+ super(s);
+ }
+ }
}
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java?rev=1094662&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java Mon Apr 18 17:16:15 2011
@@ -0,0 +1,271 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Field;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HServerInfo;
+import org.apache.hadoop.hbase.master.SplitLogManager;
+import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Common methods and attributes used by {@link SplitLogManager} and
+ * {@link SplitLogWorker}
+ */
+public class ZKSplitLog {
+ private static final Log LOG = LogFactory.getLog(ZKSplitLog.class);
+
+ public static final int DEFAULT_TIMEOUT = 25000; // 25 sec
+ public static final int DEFAULT_ZK_RETRIES = 3;
+ public static final int DEFAULT_MAX_RESUBMIT = 3;
+ public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); //3 min
+
+ /**
+ * Gets the full path node name for the log file being split
+ * @param zkw zk reference
+ * @param filename log file name (only the basename)
+ */
+ public static String getNodeName(ZooKeeperWatcher zkw, String filename) {
+ return ZKUtil.joinZNode(zkw.splitLogZNode, encode(filename));
+ }
+
+ public static String getFileName(String node) {
+ String basename = node.substring(node.lastIndexOf('/') + 1);
+ return decode(basename);
+ }
+
+
+ public static String encode(String s) {
+ try {
+ return URLEncoder.encode(s, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException("URLENCODER doesn't support UTF-8");
+ }
+ }
+
+ public static String decode(String s) {
+ try {
+ return URLDecoder.decode(s, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException("URLDecoder doesn't support UTF-8");
+ }
+ }
+
+ public static String getRescanNode(ZooKeeperWatcher zkw) {
+ return getNodeName(zkw, "RESCAN");
+ }
+
+ public static boolean isRescanNode(ZooKeeperWatcher zkw, String path) {
+ String prefix = getRescanNode(zkw);
+ if (path.length() <= prefix.length()) {
+ return false;
+ }
+ for (int i = 0; i < prefix.length(); i++) {
+ if (prefix.charAt(i) != path.charAt(i)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public static boolean isTaskPath(ZooKeeperWatcher zkw, String path) {
+ String dirname = path.substring(0, path.lastIndexOf('/'));
+ return dirname.equals(zkw.splitLogZNode);
+ }
+
+ public static enum TaskState {
+ TASK_UNASSIGNED("unassigned"),
+ TASK_OWNED("owned"),
+ TASK_RESIGNED("resigned"),
+ TASK_DONE("done"),
+ TASK_ERR("err");
+
+ private final byte[] state;
+ private TaskState(String s) {
+ state = s.getBytes();
+ }
+
+ public byte[] get(String serverName) {
+ return (Bytes.add(state, " ".getBytes(), serverName.getBytes()));
+ }
+
+ public String getWriterName(byte[] data) {
+ String str = Bytes.toString(data);
+ return str.substring(str.indexOf(' ') + 1);
+ }
+
+
+ /**
+ * @param s
+ * @return True if {@link #state} is a prefix of s. False otherwise.
+ */
+ public boolean equals(byte[] s) {
+ if (s.length < state.length) {
+ return (false);
+ }
+ for (int i = 0; i < state.length; i++) {
+ if (state[i] != s[i]) {
+ return (false);
+ }
+ }
+ return (true);
+ }
+
+ public boolean equals(byte[] s, String serverName) {
+ return (Arrays.equals(s, get(serverName)));
+ }
+ @Override
+ public String toString() {
+ return new String(state);
+ }
+ }
+
+ public static Path getSplitLogDir(Path rootdir, String tmpname) {
+ return new Path(new Path(rootdir, "splitlog"), tmpname);
+ }
+
+ public static Path stripSplitLogTempDir(Path rootdir, Path file) {
+ int skipDepth = rootdir.depth() + 2;
+ List<String> components = new ArrayList<String>(10);
+ do {
+ components.add(file.getName());
+ file = file.getParent();
+ } while (file.depth() > skipDepth);
+ Path ret = rootdir;
+ for (int i = components.size() - 1; i >= 0; i--) {
+ ret = new Path(ret, components.get(i));
+ }
+ return ret;
+ }
+
+ public static String getSplitLogDirTmpComponent(String worker, String file) {
+ return (worker + "_" + ZKSplitLog.encode(file));
+ }
+
+ public static void markCorrupted(Path rootdir, String tmpname,
+ FileSystem fs) {
+ Path file = new Path(getSplitLogDir(rootdir, tmpname), "corrupt");
+ try {
+ fs.createNewFile(file);
+ } catch (IOException e) {
+ LOG.warn("Could not flag a log file as corrupted. Failed to create " +
+ file, e);
+ }
+ }
+
+ public static boolean isCorrupted(Path rootdir, String tmpname,
+ FileSystem fs) throws IOException {
+ Path file = new Path(getSplitLogDir(rootdir, tmpname), "corrupt");
+ boolean isCorrupt;
+ isCorrupt = fs.exists(file);
+ return isCorrupt;
+ }
+
+ public static boolean isCorruptFlagFile(Path file) {
+ return file.getName().equals("corrupt");
+ }
+
+
+ public static class Counters {
+ //SplitLogManager counters
+ public static AtomicLong tot_mgr_log_split_batch_start = new AtomicLong(0);
+ public static AtomicLong tot_mgr_log_split_batch_success =
+ new AtomicLong(0);
+ public static AtomicLong tot_mgr_log_split_batch_err = new AtomicLong(0);
+ public static AtomicLong tot_mgr_new_unexpected_hlogs = new AtomicLong(0);
+ public static AtomicLong tot_mgr_log_split_start = new AtomicLong(0);
+ public static AtomicLong tot_mgr_log_split_success = new AtomicLong(0);
+ public static AtomicLong tot_mgr_log_split_err = new AtomicLong(0);
+ public static AtomicLong tot_mgr_node_create_queued = new AtomicLong(0);
+ public static AtomicLong tot_mgr_node_create_result = new AtomicLong(0);
+ public static AtomicLong tot_mgr_node_already_exists = new AtomicLong(0);
+ public static AtomicLong tot_mgr_node_create_err = new AtomicLong(0);
+ public static AtomicLong tot_mgr_node_create_retry = new AtomicLong(0);
+ public static AtomicLong tot_mgr_get_data_queued = new AtomicLong(0);
+ public static AtomicLong tot_mgr_get_data_result = new AtomicLong(0);
+ public static AtomicLong tot_mgr_get_data_err = new AtomicLong(0);
+ public static AtomicLong tot_mgr_get_data_retry = new AtomicLong(0);
+ public static AtomicLong tot_mgr_node_delete_queued = new AtomicLong(0);
+ public static AtomicLong tot_mgr_node_delete_result = new AtomicLong(0);
+ public static AtomicLong tot_mgr_node_delete_err = new AtomicLong(0);
+ public static AtomicLong tot_mgr_resubmit = new AtomicLong(0);
+ public static AtomicLong tot_mgr_resubmit_failed = new AtomicLong(0);
+ public static AtomicLong tot_mgr_null_data = new AtomicLong(0);
+ public static AtomicLong tot_mgr_orphan_task_acquired = new AtomicLong(0);
+ public static AtomicLong tot_mgr_unacquired_orphan_done = new AtomicLong(0);
+ public static AtomicLong tot_mgr_resubmit_threshold_reached =
+ new AtomicLong(0);
+ public static AtomicLong tot_mgr_missing_state_in_delete =
+ new AtomicLong(0);
+ public static AtomicLong tot_mgr_heartbeat = new AtomicLong(0);
+ public static AtomicLong tot_mgr_rescan = new AtomicLong(0);
+ public static AtomicLong tot_mgr_rescan_deleted = new AtomicLong(0);
+ public static AtomicLong tot_mgr_task_deleted = new AtomicLong(0);
+ public static AtomicLong tot_mgr_resubmit_unassigned = new AtomicLong(0);
+ public static AtomicLong tot_mgr_relist_logdir = new AtomicLong(0);
+
+
+
+ // SplitLogWorker counters
+ public static AtomicLong tot_wkr_failed_to_grab_task_no_data =
+ new AtomicLong(0);
+ public static AtomicLong tot_wkr_failed_to_grab_task_exception =
+ new AtomicLong(0);
+ public static AtomicLong tot_wkr_failed_to_grab_task_owned =
+ new AtomicLong(0);
+ public static AtomicLong tot_wkr_failed_to_grab_task_lost_race =
+ new AtomicLong(0);
+ public static AtomicLong tot_wkr_task_acquired = new AtomicLong(0);
+ public static AtomicLong tot_wkr_task_resigned = new AtomicLong(0);
+ public static AtomicLong tot_wkr_task_done = new AtomicLong(0);
+ public static AtomicLong tot_wkr_task_err = new AtomicLong(0);
+ public static AtomicLong tot_wkr_task_heartbeat = new AtomicLong(0);
+ public static AtomicLong tot_wkr_task_acquired_rescan = new AtomicLong(0);
+ public static AtomicLong tot_wkr_get_data_queued = new AtomicLong(0);
+ public static AtomicLong tot_wkr_get_data_result = new AtomicLong(0);
+ public static AtomicLong tot_wkr_get_data_retry = new AtomicLong(0);
+ public static AtomicLong tot_wkr_preempt_task = new AtomicLong(0);
+ public static AtomicLong tot_wkr_task_heartbeat_failed = new AtomicLong(0);
+ public static AtomicLong tot_wkr_final_transistion_failed =
+ new AtomicLong(0);
+
+ public static void resetCounters() throws Exception {
+ Class<?> cl = (new Counters()).getClass();
+ Field[] flds = cl.getDeclaredFields();
+ for (Field fld : flds) {
+ ((AtomicLong)fld.get(null)).set(0);
+ }
+ }
+ }
+}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java?rev=1094662&r1=1094661&r2=1094662&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java Mon Apr 18 17:16:15 2011
@@ -575,7 +575,7 @@ public class ZKUtil {
*
* @param zkw zk reference
* @param znode path of node
- * @param stat node status to set if node exists
+ * @param stat node status to get if node exists
* @return data of the specified znode, or null if node does not exist
* @throws KeeperException if unexpected zookeeper exception
*/
@@ -583,7 +583,7 @@ public class ZKUtil {
Stat stat)
throws KeeperException {
try {
- byte [] data = zkw.getZooKeeper().getData(znode, zkw, stat);
+ byte [] data = zkw.getZooKeeper().getData(znode, null, stat);
logRetrievedMsg(zkw, znode, data, false);
return data;
} catch (KeeperException.NoNodeException e) {
@@ -879,8 +879,7 @@ public class ZKUtil {
*/
public static void asyncCreate(ZooKeeperWatcher zkw,
String znode, byte [] data, final AsyncCallback.StringCallback cb,
- final Object ctx)
- throws KeeperException, KeeperException.NodeExistsException {
+ final Object ctx) {
zkw.getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT, cb, ctx);
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java?rev=1094662&r1=1094661&r2=1094662&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java Mon Apr 18 17:16:15 2011
@@ -89,6 +89,8 @@ public class ZooKeeperWatcher implements
public String tableZNode;
// znode containing the unique cluster ID
public String clusterIdZNode;
+ // znode used for log splitting work assignment
+ public String splitLogZNode;
private final Configuration conf;
@@ -165,6 +167,7 @@ public class ZooKeeperWatcher implements
ZKUtil.createAndFailSilent(this, assignmentZNode);
ZKUtil.createAndFailSilent(this, rsZNode);
ZKUtil.createAndFailSilent(this, tableZNode);
+ ZKUtil.createAndFailSilent(this, splitLogZNode);
} catch (KeeperException e) {
throw new ZooKeeperConnectionException(
prefix("Unexpected KeeperException creating base node"), e);
@@ -210,6 +213,8 @@ public class ZooKeeperWatcher implements
conf.get("zookeeper.znode.tableEnableDisable", "table"));
clusterIdZNode = ZKUtil.joinZNode(baseZNode,
conf.get("zookeeper.znode.clusterId", "hbaseid"));
+ splitLogZNode = ZKUtil.joinZNode(baseZNode,
+ conf.get("zookeeper.znode.splitlog", "splitlog"));
}
/**
@@ -247,7 +252,7 @@ public class ZooKeeperWatcher implements
/**
* Method called from ZooKeeper for events and connection status.
- *
+ * <p>
* Valid events are passed along to listeners. Connection status changes
* are dealt with locally.
*/
@@ -302,12 +307,12 @@ public class ZooKeeperWatcher implements
/**
* Called when there is a connection-related event via the Watcher callback.
- *
+ * <p>
* If Disconnected or Expired, this should shutdown the cluster. But, since
* we send a KeeperException.SessionExpiredException along with the abort
* call, it's possible for the Abortable to catch it and try to create a new
* session with ZooKeeper. This is what the client does in HCM.
- *
+ * <p>
* @param event
*/
private void connectionEvent(WatchedEvent event) {
@@ -376,11 +381,11 @@ public class ZooKeeperWatcher implements
/**
* Handles KeeperExceptions in client calls.
- *
+ * <p>
* This may be temporary but for now this gives one place to deal with these.
- *
+ * <p>
* TODO: Currently this method rethrows the exception to let the caller handle
- *
+ * <p>
* @param ke
* @throws KeeperException
*/
@@ -392,13 +397,13 @@ public class ZooKeeperWatcher implements
/**
* Handles InterruptedExceptions in client calls.
- *
+ * <p>
* This may be temporary but for now this gives one place to deal with these.
- *
+ * <p>
* TODO: Currently, this method does nothing.
* Is this ever expected to happen? Do we abort or can we let it run?
* Maybe this should be logged as WARN? It shouldn't happen?
- *
+ * <p>
* @param ie
*/
public void interruptedException(InterruptedException ie) {
Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java?rev=1094662&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java Mon Apr 18 17:16:15 2011
@@ -0,0 +1,445 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.OrphanHLogAfterSplitException;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.zookeeper.ZKAssign;
+import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.junit.Before;
+import org.junit.After;
+import org.junit.Test;
+
+public class TestDistributedLogSplitting {
+ private static final Log LOG = LogFactory.getLog(TestSplitLogManager.class);
+ static {
+ Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
+ }
+
+ // Start a cluster with 2 masters and 3 regionservers
+ final int NUM_MASTERS = 2;
+ final int NUM_RS = 6;
+
+ MiniHBaseCluster cluster;
+ HMaster master;
+ Configuration conf;
+ HBaseTestingUtility TEST_UTIL;
+
+ @Before
+ public void before() throws Exception {
+
+ }
+
+ private void startCluster(int num_rs) throws Exception{
+ ZKSplitLog.Counters.resetCounters();
+ LOG.info("Starting cluster");
+ conf = HBaseConfiguration.create();
+ conf.getLong("hbase.splitlog.max.resubmit", 0);
+ TEST_UTIL = new HBaseTestingUtility(conf);
+ TEST_UTIL.startMiniCluster(NUM_MASTERS, num_rs);
+ cluster = TEST_UTIL.getHBaseCluster();
+ LOG.info("Waiting for active/ready master");
+ cluster.waitForActiveAndReadyMaster();
+ master = cluster.getMaster();
+ }
+
+ @After
+ public void after() throws Exception {
+ cluster.shutdown();
+ }
+
+ @Test
+ public void testThreeRSAbort() throws Exception {
+ LOG.info("testThreeRSAbort");
+ final int NUM_REGIONS_TO_CREATE = 40;
+ final int NUM_ROWS_PER_REGION = 100;
+
+ startCluster(NUM_RS);
+
+ ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf,
+ "distributed log splitting test", null);
+
+ HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
+ populateDataInTable(NUM_ROWS_PER_REGION, "family");
+
+
+ List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
+ assertEquals(NUM_RS, rsts.size());
+ rsts.get(0).getRegionServer().abort("testing");
+ rsts.get(1).getRegionServer().abort("testing");
+ rsts.get(2).getRegionServer().abort("testing");
+
+ long start = EnvironmentEdgeManager.currentTimeMillis();
+ while (cluster.getLiveRegionServerThreads().size() > (NUM_RS - 3)) {
+ if (EnvironmentEdgeManager.currentTimeMillis() - start > 60000) {
+ assertTrue(false);
+ }
+ Thread.sleep(200);
+ }
+
+ start = EnvironmentEdgeManager.currentTimeMillis();
+ while (getAllOnlineRegions(cluster).size() < (NUM_REGIONS_TO_CREATE + 2)) {
+ if (EnvironmentEdgeManager.currentTimeMillis() - start > 60000) {
+ assertTrue(false);
+ }
+ Thread.sleep(200);
+ }
+
+ assertEquals(NUM_REGIONS_TO_CREATE * NUM_ROWS_PER_REGION,
+ TEST_UTIL.countRows(ht));
+ }
+
+ @Test(expected=OrphanHLogAfterSplitException.class)
+ public void testOrphanLogCreation() throws Exception {
+ LOG.info("testOrphanLogCreation");
+ startCluster(NUM_RS);
+ final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
+ final FileSystem fs = master.getMasterFileSystem().getFileSystem();
+
+ List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
+ HRegionServer hrs = rsts.get(0).getRegionServer();
+ Path rootdir = FSUtils.getRootDir(conf);
+ final Path logDir = new Path(rootdir,
+ HLog.getHLogDirectoryName(hrs.getServerName()));
+
+ installTable(new ZooKeeperWatcher(conf, "table-creation", null),
+ "table", "family", 40);
+
+ makeHLog(hrs.getWAL(), hrs.getOnlineRegions(), "table",
+ 1000, 100);
+
+ new Thread() {
+ public void run() {
+ while (true) {
+ int i = 0;
+ try {
+ while(ZKSplitLog.Counters.tot_mgr_log_split_batch_start.get() ==
+ 0) {
+ Thread.yield();
+ }
+ fs.createNewFile(new Path(logDir, "foo" + i++));
+ } catch (Exception e) {
+ LOG.debug("file creation failed", e);
+ return;
+ }
+ }
+ }
+ }.start();
+ slm.splitLogDistributed(logDir);
+ FileStatus[] files = fs.listStatus(logDir);
+ if (files != null) {
+ for (FileStatus file : files) {
+ LOG.debug("file still there " + file.getPath());
+ }
+ }
+ }
+
+ @Test
+ public void testRecoveredEdits() throws Exception {
+ LOG.info("testRecoveredEdits");
+ startCluster(NUM_RS);
+ final int NUM_LOG_LINES = 1000;
+ final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
+ // turn off load balancing to prevent regions from moving around otherwise
+ // they will consume recovered.edits
+ master.balanceSwitch(false);
+ FileSystem fs = master.getMasterFileSystem().getFileSystem();
+
+ List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
+ HRegionServer hrs = rsts.get(0).getRegionServer();
+ Path rootdir = FSUtils.getRootDir(conf);
+ final Path logDir = new Path(rootdir,
+ HLog.getHLogDirectoryName(hrs.getServerName()));
+
+ installTable(new ZooKeeperWatcher(conf, "table-creation", null),
+ "table", "family", 40);
+ byte[] table = Bytes.toBytes("table");
+ List<HRegionInfo> regions = hrs.getOnlineRegions();
+ LOG.info("#regions = " + regions.size());
+ Iterator<HRegionInfo> it = regions.iterator();
+ while (it.hasNext()) {
+ HRegionInfo region = it.next();
+ if (region.isMetaRegion() || region.isRootRegion()) {
+ it.remove();
+ }
+ }
+ makeHLog(hrs.getWAL(), regions, "table",
+ NUM_LOG_LINES, 100);
+
+ slm.splitLogDistributed(logDir);
+
+ int count = 0;
+ for (HRegionInfo hri : regions) {
+
+ Path tdir = HTableDescriptor.getTableDir(rootdir, table);
+ Path editsdir =
+ HLog.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir,
+ hri.getEncodedName()));
+ LOG.debug("checking edits dir " + editsdir);
+ FileStatus[] files = fs.listStatus(editsdir);
+ assertEquals(1, files.length);
+ int c = countHLog(files[0].getPath(), fs, conf);
+ count += c;
+ LOG.info(c + " edits in " + files[0].getPath());
+ }
+ assertEquals(NUM_LOG_LINES, count);
+ }
+
+ @Test
+ public void testWorkerAbort() throws Exception {
+ LOG.info("testWorkerAbort");
+ startCluster(1);
+ final int NUM_LOG_LINES = 10000;
+ final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
+ FileSystem fs = master.getMasterFileSystem().getFileSystem();
+
+ final List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
+ HRegionServer hrs = rsts.get(0).getRegionServer();
+ Path rootdir = FSUtils.getRootDir(conf);
+ final Path logDir = new Path(rootdir,
+ HLog.getHLogDirectoryName(hrs.getServerName()));
+
+ installTable(new ZooKeeperWatcher(conf, "table-creation", null),
+ "table", "family", 40);
+ byte[] table = Bytes.toBytes("table");
+ makeHLog(hrs.getWAL(), hrs.getOnlineRegions(), "table",
+ NUM_LOG_LINES, 100);
+
+ new Thread() {
+ public void run() {
+ waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
+ for (RegionServerThread rst : rsts) {
+ rst.getRegionServer().abort("testing");
+ }
+ }
+ }.start();
+ // slm.splitLogDistributed(logDir);
+ FileStatus[] logfiles = fs.listStatus(logDir);
+ TaskBatch batch = new TaskBatch();
+ slm.installTask(logfiles[0].getPath().toString(), batch);
+ //waitForCounter but for one of the 2 counters
+ long curt = System.currentTimeMillis();
+ long endt = curt + 30000;
+ while (curt < endt) {
+ if ((tot_wkr_task_resigned.get() + tot_wkr_task_err.get() +
+ tot_wkr_final_transistion_failed.get()) == 0) {
+ Thread.yield();
+ curt = System.currentTimeMillis();
+ } else {
+ assertEquals(1, (tot_wkr_task_resigned.get() + tot_wkr_task_err.get() +
+ tot_wkr_final_transistion_failed.get()));
+ return;
+ }
+ }
+ assertEquals(1, batch.done);
+ // fail("region server completed the split before aborting");
+ return;
+ }
+
+ HTable installTable(ZooKeeperWatcher zkw, String tname, String fname,
+ int nrs ) throws Exception {
+ // Create a table with regions
+ byte [] table = Bytes.toBytes(tname);
+ byte [] family = Bytes.toBytes(fname);
+ LOG.info("Creating table with " + nrs + " regions");
+ HTable ht = TEST_UTIL.createTable(table, family);
+ int numRegions = TEST_UTIL.createMultiRegions(conf, ht, family, nrs);
+ assertEquals(nrs, numRegions);
+ LOG.info("Waiting for no more RIT\n");
+ blockUntilNoRIT(zkw, master);
+ // disable-enable cycle to get rid of table's dead regions left behind
+ // by createMultiRegions
+ LOG.debug("Disabling table\n");
+ TEST_UTIL.getHBaseAdmin().disableTable(table);
+ LOG.debug("Waiting for no more RIT\n");
+ blockUntilNoRIT(zkw, master);
+ NavigableSet<String> regions = getAllOnlineRegions(cluster);
+ LOG.debug("Verifying only catalog regions are assigned\n");
+ if (regions.size() != 2) {
+ for (String oregion : regions)
+ LOG.debug("Region still online: " + oregion);
+ }
+ assertEquals(2, regions.size());
+ LOG.debug("Enabling table\n");
+ TEST_UTIL.getHBaseAdmin().enableTable(table);
+ LOG.debug("Waiting for no more RIT\n");
+ blockUntilNoRIT(zkw, master);
+ LOG.debug("Verifying there are " + numRegions + " assigned on cluster\n");
+ regions = getAllOnlineRegions(cluster);
+ assertEquals(numRegions + 2, regions.size());
+ return ht;
+ }
+
+ void populateDataInTable(int nrows, String fname) throws Exception {
+ byte [] family = Bytes.toBytes(fname);
+
+ List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
+ assertEquals(NUM_RS, rsts.size());
+
+ for (RegionServerThread rst : rsts) {
+ HRegionServer hrs = rst.getRegionServer();
+ List<HRegionInfo> hris = hrs.getOnlineRegions();
+ for (HRegionInfo hri : hris) {
+ if (hri.isMetaRegion() || hri.isRootRegion()) {
+ continue;
+ }
+ LOG.debug("adding data to rs = " + rst.getName() +
+ " region = "+ hri.getRegionNameAsString());
+ HRegion region = hrs.getOnlineRegion(hri.getRegionName());
+ assertTrue(region != null);
+ putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), family);
+ }
+ }
+ }
+
+ public void makeHLog(HLog log,
+ List<HRegionInfo> hris, String tname,
+ int num_edits, int edit_size) throws IOException {
+
+ byte[] table = Bytes.toBytes(tname);
+ byte[] value = new byte[edit_size];
+ for (int i = 0; i < edit_size; i++) {
+ value[i] = (byte)('a' + (i % 26));
+ }
+ int n = hris.size();
+ int[] counts = new int[n];
+ int j = 0;
+ for (int i = 0; i < num_edits; i += 1) {
+ WALEdit e = new WALEdit();
+ byte [] row = Bytes.toBytes("r" + Integer.toString(i));
+ byte [] family = Bytes.toBytes("f");
+ byte [] qualifier = Bytes.toBytes("c" + Integer.toString(i));
+ e.add(new KeyValue(row, family, qualifier,
+ System.currentTimeMillis(), value));
+ // LOG.info("Region " + i + ": " + e);
+ j++;
+ log.append(hris.get(j % n), table, e, System.currentTimeMillis());
+ counts[j % n] += 1;
+ // if ((i % 8096) == 0) {
+ // log.sync();
+ // }
+ }
+ log.sync();
+ log.close();
+ for (int i = 0; i < n; i++) {
+ LOG.info("region " + hris.get(i).getRegionNameAsString() +
+ " has " + counts[i] + " edits");
+ }
+ return;
+ }
+
+ private int countHLog(Path log, FileSystem fs, Configuration conf)
+ throws IOException {
+ int count = 0;
+ HLog.Reader in = HLog.getReader(fs, log, conf);
+ while (in.next() != null) {
+ count++;
+ }
+ return count;
+ }
+
+ private void blockUntilNoRIT(ZooKeeperWatcher zkw, HMaster master)
+ throws KeeperException, InterruptedException {
+ ZKAssign.blockUntilNoRIT(zkw);
+ master.assignmentManager.waitUntilNoRegionsInTransition(60000);
+ }
+
+ private void blockUntilRIT(ZooKeeperWatcher zkw)
+ throws KeeperException, InterruptedException {
+ ZKAssign.blockUntilRIT(zkw);
+ }
+
+ private void putData(HRegion region, byte[] startRow, int numRows, byte [] qf,
+ byte [] ...families)
+ throws IOException {
+ for(int i = 0; i < numRows; i++) {
+ Put put = new Put(Bytes.add(startRow, Bytes.toBytes(i)));
+ for(byte [] family : families) {
+ put.add(family, qf, null);
+ }
+ region.put(put);
+ }
+ }
+
+ private NavigableSet<String> getAllOnlineRegions(MiniHBaseCluster cluster) {
+ NavigableSet<String> online = new TreeSet<String>();
+ for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
+ for (HRegionInfo region : rst.getRegionServer().getOnlineRegions()) {
+ online.add(region.getRegionNameAsString());
+ }
+ }
+ return online;
+ }
+
+ private void waitForCounter(AtomicLong ctr, long oldval, long newval,
+ long timems) {
+ long curt = System.currentTimeMillis();
+ long endt = curt + timems;
+ while (curt < endt) {
+ if (ctr.get() == oldval) {
+ Thread.yield();
+ curt = System.currentTimeMillis();
+ } else {
+ assertEquals(newval, ctr.get());
+ return;
+ }
+ }
+ assertTrue(false);
+ }
+}
Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java?rev=1094662&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java Mon Apr 18 17:16:15 2011
@@ -0,0 +1,432 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.master.SplitLogManager.Task;
+import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
+import org.apache.hadoop.hbase.regionserver.TestMasterAddressManager.NodeCreationListener;
+import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
+import org.apache.hadoop.hbase.zookeeper.ZKSplitLog.TaskState;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class TestSplitLogManager {
+ private static final Log LOG = LogFactory.getLog(TestSplitLogManager.class);
+ static {
+ Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
+ }
+
+ private ZooKeeperWatcher zkw;
+ private static boolean stopped = false;
+ private SplitLogManager slm;
+ private Configuration conf;
+
+ private final static HBaseTestingUtility TEST_UTIL =
+ new HBaseTestingUtility();
+
+ static Stoppable stopper = new Stoppable() {
+ @Override
+ public void stop(String why) {
+ stopped = true;
+ }
+
+ @Override
+ public boolean isStopped() {
+ return stopped;
+ }
+
+ };
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ }
+
+ @Before
+ public void setup() throws Exception {
+ TEST_UTIL.startMiniZKCluster();
+ conf = TEST_UTIL.getConfiguration();
+ zkw = new ZooKeeperWatcher(conf, "split-log-manager-tests", null);
+ ZKUtil.deleteChildrenRecursively(zkw, zkw.baseZNode);
+ ZKUtil.createAndFailSilent(zkw, zkw.baseZNode);
+ assertTrue(ZKUtil.checkExists(zkw, zkw.baseZNode) != -1);
+ LOG.debug(zkw.baseZNode + " created");
+ ZKUtil.createAndFailSilent(zkw, zkw.splitLogZNode);
+ assertTrue(ZKUtil.checkExists(zkw, zkw.splitLogZNode) != -1);
+ LOG.debug(zkw.splitLogZNode + " created");
+
+ stopped = false;
+ resetCounters();
+ }
+
+ @After
+ public void teardown() throws IOException, KeeperException {
+ stopper.stop("");
+ slm.stop();
+ TEST_UTIL.shutdownMiniZKCluster();
+ }
+
+ private void waitForCounter(AtomicLong ctr, long oldval, long newval,
+ long timems) {
+ long curt = System.currentTimeMillis();
+ long endt = curt + timems;
+ while (curt < endt) {
+ if (ctr.get() == oldval) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ }
+ curt = System.currentTimeMillis();
+ } else {
+ assertEquals(newval, ctr.get());
+ return;
+ }
+ }
+ assertTrue(false);
+ }
+
+ private int numRescanPresent() throws KeeperException {
+ int num = 0;
+ List<String> nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.splitLogZNode);
+ for (String node : nodes) {
+ if (ZKSplitLog.isRescanNode(zkw, ZKSplitLog.getNodeName(zkw, node))) {
+ num++;
+ }
+ }
+ return num;
+ }
+
+ private void setRescanNodeDone(int count) throws KeeperException {
+ List<String> nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.splitLogZNode);
+ for (String node : nodes) {
+ if (ZKSplitLog.isRescanNode(zkw, ZKSplitLog.getNodeName(zkw, node))) {
+ ZKUtil.setData(zkw, ZKSplitLog.getNodeName(zkw, node),
+ TaskState.TASK_DONE.get("some-worker"));
+ count--;
+ }
+ }
+ assertEquals(0, count);
+ }
+
+ private String submitTaskAndWait(TaskBatch batch, String name)
+ throws KeeperException, InterruptedException {
+ String tasknode = ZKSplitLog.getNodeName(zkw, "foo");
+ NodeCreationListener listener = new NodeCreationListener(zkw, tasknode);
+ zkw.registerListener(listener);
+ ZKUtil.watchAndCheckExists(zkw, tasknode);
+
+ slm.installTask("foo", batch);
+ assertEquals(1, batch.installed);
+ assertTrue(slm.findOrCreateOrphanTask(tasknode).batch == batch);
+ assertEquals(1L, tot_mgr_node_create_queued.get());
+
+ LOG.debug("waiting for task node creation");
+ listener.waitForCreation();
+ LOG.debug("task created");
+ return tasknode;
+ }
+
+ /**
+ * Test whether the splitlog correctly creates a task in zookeeper
+ * @throws Exception
+ */
+ @Test
+ public void testTaskCreation() throws Exception {
+ LOG.info("TestTaskCreation - test the creation of a task in zk");
+
+ slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+ slm.finishInitialization();
+ TaskBatch batch = new TaskBatch();
+
+ String tasknode = submitTaskAndWait(batch, "foo");
+
+ byte[] data = ZKUtil.getData(zkw, tasknode);
+ LOG.info("Task node created " + new String(data));
+ assertTrue(TaskState.TASK_UNASSIGNED.equals(data, "dummy-master"));
+ }
+
+ @Test
+ public void testOrphanTaskAcquisition() throws Exception {
+ LOG.info("TestOrphanTaskAcquisition");
+
+ String tasknode = ZKSplitLog.getNodeName(zkw, "orphan");
+ zkw.getZooKeeper().create(tasknode,
+ TaskState.TASK_OWNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+
+ int to = 1000;
+ conf.setInt("hbase.splitlog.manager.timeout", to);
+ conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
+ to = to + 2 * 100;
+
+
+ slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+ slm.finishInitialization();
+ waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, 100);
+ Task task = slm.findOrCreateOrphanTask(tasknode);
+ assertTrue(task.isOrphan());
+ waitForCounter(tot_mgr_heartbeat, 0, 1, 100);
+ assertFalse(task.isUnassigned());
+ long curt = System.currentTimeMillis();
+ assertTrue((task.last_update <= curt) &&
+ (task.last_update > (curt - 1000)));
+ LOG.info("waiting for manager to resubmit the orphan task");
+ waitForCounter(tot_mgr_resubmit, 0, 1, to + 100);
+ assertTrue(task.isUnassigned());
+ waitForCounter(tot_mgr_rescan, 0, 1, to + 100);
+ assertEquals(1, numRescanPresent());
+ }
+
+ @Test
+ public void testUnassignedOrphan() throws Exception {
+ LOG.info("TestUnassignedOrphan - an unassigned task is resubmitted at" +
+ " startup");
+ String tasknode = ZKSplitLog.getNodeName(zkw, "orphan");
+ //create an unassigned orphan task
+ zkw.getZooKeeper().create(tasknode,
+ TaskState.TASK_UNASSIGNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ int version = ZKUtil.checkExists(zkw, tasknode);
+
+ slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+ slm.finishInitialization();
+ waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, 100);
+ Task task = slm.findOrCreateOrphanTask(tasknode);
+ assertTrue(task.isOrphan());
+ assertTrue(task.isUnassigned());
+ // wait for RESCAN node to be created
+ waitForCounter(tot_mgr_rescan, 0, 1, 500);
+ Task task2 = slm.findOrCreateOrphanTask(tasknode);
+ assertTrue(task == task2);
+ LOG.debug("task = " + task);
+ assertEquals(1L, tot_mgr_resubmit.get());
+ assertEquals(1, task.incarnation);
+ assertEquals(0, task.unforcedResubmits);
+ assertTrue(task.isOrphan());
+ assertTrue(task.isUnassigned());
+ assertTrue(ZKUtil.checkExists(zkw, tasknode) > version);
+ assertEquals(1, numRescanPresent());
+ }
+
+ @Test
+ public void testMultipleResubmits() throws Exception {
+ LOG.info("TestMultipleResbmits - no indefinite resubmissions");
+
+ int to = 1000;
+ conf.setInt("hbase.splitlog.manager.timeout", to);
+ conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
+ to = to + 2 * 100;
+
+ conf.setInt("hbase.splitlog.max.resubmit", 2);
+ slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+ slm.finishInitialization();
+ TaskBatch batch = new TaskBatch();
+
+ String tasknode = submitTaskAndWait(batch, "foo");
+ int version = ZKUtil.checkExists(zkw, tasknode);
+
+ ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker1"));
+ waitForCounter(tot_mgr_heartbeat, 0, 1, 1000);
+ waitForCounter(tot_mgr_resubmit, 0, 1, to + 100);
+ int version1 = ZKUtil.checkExists(zkw, tasknode);
+ assertTrue(version1 > version);
+ ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker2"));
+ waitForCounter(tot_mgr_heartbeat, 1, 2, 1000);
+ waitForCounter(tot_mgr_resubmit, 1, 2, to + 100);
+ int version2 = ZKUtil.checkExists(zkw, tasknode);
+ assertTrue(version2 > version1);
+ ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker3"));
+ waitForCounter(tot_mgr_heartbeat, 1, 2, 1000);
+ waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + 100);
+ assertEquals(2, numRescanPresent());
+ Thread.sleep(to + 100);
+ assertEquals(2L, tot_mgr_resubmit.get());
+ }
+
+ @Test
+ public void testRescanCleanup() throws Exception {
+ LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up");
+
+ int to = 1000;
+ conf.setInt("hbase.splitlog.manager.timeout", to);
+ conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
+ to = to + 2 * 100;
+ slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+ slm.finishInitialization();
+ TaskBatch batch = new TaskBatch();
+
+ String tasknode = submitTaskAndWait(batch, "foo");
+ int version = ZKUtil.checkExists(zkw, tasknode);
+
+ ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker1"));
+ waitForCounter(tot_mgr_heartbeat, 0, 1, 1000);
+ waitForCounter(tot_mgr_resubmit, 0, 1, to + 100);
+ int version1 = ZKUtil.checkExists(zkw, tasknode);
+ assertTrue(version1 > version);
+ assertEquals(1, numRescanPresent());
+ byte[] taskstate = ZKUtil.getData(zkw, tasknode);
+ assertTrue(Arrays.equals(TaskState.TASK_UNASSIGNED.get("dummy-master"),
+ taskstate));
+
+ setRescanNodeDone(1);
+
+ waitForCounter(tot_mgr_rescan_deleted, 0, 1, 1000);
+
+ assertEquals(0, numRescanPresent());
+ return;
+ }
+
+ @Test
+ public void testTaskDone() throws Exception {
+ LOG.info("TestTaskDone - cleanup task node once in DONE state");
+
+ slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+ slm.finishInitialization();
+ TaskBatch batch = new TaskBatch();
+ String tasknode = submitTaskAndWait(batch, "foo");
+ ZKUtil.setData(zkw, tasknode, TaskState.TASK_DONE.get("worker"));
+ synchronized (batch) {
+ while (batch.installed != batch.done) {
+ batch.wait();
+ }
+ }
+ waitForCounter(tot_mgr_task_deleted, 0, 1, 1000);
+ assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1);
+ }
+
+ @Test
+ public void testTaskErr() throws Exception {
+ LOG.info("TestTaskErr - cleanup task node once in ERR state");
+
+ conf.setInt("hbase.splitlog.max.resubmit", 0);
+ slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+ slm.finishInitialization();
+ TaskBatch batch = new TaskBatch();
+
+ String tasknode = submitTaskAndWait(batch, "foo");
+ ZKUtil.setData(zkw, tasknode, TaskState.TASK_ERR.get("worker"));
+ synchronized (batch) {
+ while (batch.installed != batch.error) {
+ batch.wait();
+ }
+ }
+ waitForCounter(tot_mgr_task_deleted, 0, 1, 1000);
+ assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1);
+ conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLog.DEFAULT_MAX_RESUBMIT);
+ }
+
+ @Test
+ public void testTaskResigned() throws Exception {
+ LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state");
+
+ slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+ slm.finishInitialization();
+ TaskBatch batch = new TaskBatch();
+ String tasknode = submitTaskAndWait(batch, "foo");
+ ZKUtil.setData(zkw, tasknode, TaskState.TASK_RESIGNED.get("worker"));
+ int version = ZKUtil.checkExists(zkw, tasknode);
+
+ waitForCounter(tot_mgr_resubmit, 0, 1, 1000);
+ int version1 = ZKUtil.checkExists(zkw, tasknode);
+ assertTrue(version1 > version);
+ assertEquals(1, numRescanPresent());
+
+ byte[] taskstate = ZKUtil.getData(zkw, tasknode);
+ assertTrue(Arrays.equals(taskstate,
+ TaskState.TASK_UNASSIGNED.get("dummy-master")));
+ }
+
+ @Test
+ public void testUnassignedTimeout() throws Exception {
+ LOG.info("TestUnassignedTimeout - iff all tasks are unassigned then" +
+ " resubmit");
+
+ // create an orphan task in OWNED state
+ String tasknode1 = ZKSplitLog.getNodeName(zkw, "orphan");
+ zkw.getZooKeeper().create(tasknode1,
+ TaskState.TASK_OWNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+
+ int to = 1000;
+ conf.setInt("hbase.splitlog.manager.timeout", to);
+ conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to);
+ conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
+
+
+ slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+ slm.finishInitialization();
+ waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, 100);
+
+
+ // submit another task which will stay in unassigned mode
+ TaskBatch batch = new TaskBatch();
+ submitTaskAndWait(batch, "foo");
+
+ // keep updating the orphan owned node every to/2 seconds
+ for (int i = 0; i < (3 * to)/100; i++) {
+ Thread.sleep(100);
+ ZKUtil.setData(zkw, tasknode1,
+ TaskState.TASK_OWNED.get("dummy-worker"));
+ }
+
+ // since all the nodes in the system are not unassigned the
+ // unassigned_timeout must not have kicked in
+ assertEquals(0, numRescanPresent());
+
+ // since we have stopped heartbeating the owned node therefore it should
+ // get resubmitted
+ LOG.info("waiting for manager to resubmit the orphan task");
+ waitForCounter(tot_mgr_resubmit, 0, 1, to + 500);
+ assertEquals(1, numRescanPresent());
+
+ // now all the nodes are unassigned. manager should post another rescan
+ waitForCounter(tot_mgr_resubmit_unassigned, 0, 1, 2 * to + 500);
+ assertEquals(2, numRescanPresent());
+ }
+}