You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/25 00:30:32 UTC
svn commit: r1188419 [2/3] - in /hbase/branches/0.89-fb: ./
src/main/java/org/apache/hadoop/hbase/mapreduce/
src/main/java/org/apache/hadoop/hbase/master/
src/main/java/org/apache/hadoop/hbase/regionserver/
src/main/java/org/apache/hadoop/hbase/regions...
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1188419&r1=1188418&r2=1188419&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Mon Oct 24 22:30:31 2011
@@ -1606,7 +1606,7 @@ public class HLog implements Syncable {
return pattern.matcher(filename).matches();
}
- private static Path getHLogArchivePath(Path oldLogDir, Path p) {
+ static Path getHLogArchivePath(Path oldLogDir, Path p) {
return new Path(oldLogDir, p.getName());
}
@@ -1756,7 +1756,8 @@ public class HLog implements Syncable {
WriterAndPath wap = logWriters.get(region);
for (Entry logEntry: entries) {
if (wap == null) {
- Path regionedits = getRegionSplitEditsPath(fs, logEntry, rootDir);
+ Path regionedits = getRegionSplitEditsPath(fs, logEntry, rootDir,
+ true);
if (fs.exists(regionedits)) {
LOG.warn("Found existing old edits file. It could be the " +
"result of a previous failed split attempt. Deleting " +
@@ -1799,54 +1800,66 @@ public class HLog implements Syncable {
* @param conf
* @throws IOException
*/
- private static void archiveLogs(final List<Path> corruptedLogs,
- final List<Path> processedLogs, final Path oldLogDir,
- final FileSystem fs, final Configuration conf)
+ static void archiveLogs(final List<Path> corruptedLogs,
+ final List<Path> processedLogs, final Path oldLogDir,
+ final FileSystem fs, final Configuration conf)
throws IOException{
final Path corruptDir = new Path(conf.get(HConstants.HBASE_DIR),
conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir", ".corrupt"));
- fs.mkdirs(corruptDir);
- fs.mkdirs(oldLogDir);
-
+ if (!fs.exists(corruptDir) && !fs.mkdirs(corruptDir)) {
+ LOG.warn("Unable to mkdir " + corruptDir);
+ }
+ if (!fs.exists(oldLogDir) && !fs.mkdirs(oldLogDir)) {
+ LOG.warn("Unable to mkdir " + oldLogDir);
+ }
for (Path corrupted: corruptedLogs) {
Path p = new Path(corruptDir, corrupted.getName());
- LOG.info("Moving corrupted log " + corrupted + " to " + p);
- fs.rename(corrupted, p);
+ if (!fs.rename(corrupted, p)) {
+ LOG.warn("Unable to move corrupted log " + corrupted + " to " + p);
+ } else {
+ LOG.info("Moving corrupted log " + corrupted + " to " + p);
+ }
}
for (Path p: processedLogs) {
Path newPath = getHLogArchivePath(oldLogDir, p);
- fs.rename(p, newPath);
- LOG.info("Archived processed log " + p + " to " + newPath);
+ if (!fs.rename(p, newPath)) {
+ LOG.warn("Unable to move processed log " + p + " to " + newPath);
+ } else {
+ LOG.info("Archived processed log " + p + " to " + newPath);
+ }
}
}
- /*
+ /**
* Path to a file under RECOVERED_EDITS_DIR directory of the region found in
* <code>logEntry</code> named for the sequenceid in the passed
- * <code>logEntry</code>: e.g. /hbase/some_table/2323432434/recovered.edits/2332.
- * This method also ensures existence of RECOVERED_EDITS_DIR under the region
- * creating it if necessary.
+ * <code>logEntry</code>: e.g.
+ * /hbase/some_table/2323432434/recovered.edits/2332. This method also ensures
+ * existence of RECOVERED_EDITS_DIR under the region creating it if necessary.
+ *
* @param fs
* @param logEntry
* @param rootDir HBase root dir.
+ * @param isCreate if true create the directory, otherwise just return the name
* @return Path to file into which to dump split log edits.
* @throws IOException
*/
- private static Path getRegionSplitEditsPath(final FileSystem fs,
- final Entry logEntry, final Path rootDir)
+ static Path getRegionSplitEditsPath(final FileSystem fs,
+ final Entry logEntry, final Path rootDir, boolean isCreate)
throws IOException {
- Path tableDir = HTableDescriptor.getTableDir(rootDir,
- logEntry.getKey().getTablename());
+ Path tableDir = HTableDescriptor.getTableDir(rootDir, logEntry.getKey()
+ .getTablename());
Path regiondir = HRegion.getRegionDir(tableDir,
- HRegionInfo.encodeRegionName(logEntry.getKey().getRegionName()));
+ HRegionInfo.encodeRegionName(logEntry.getKey().getRegionName()));
Path dir = getRegionDirRecoveredEditsDir(regiondir);
- if (!fs.exists(dir)) {
+
+ if (isCreate && !fs.exists(dir)) {
if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
}
- return new Path(dir,
- formatRecoveredEditsFileName(logEntry.getKey().getLogSeqNum()));
+ return new Path(dir, formatRecoveredEditsFileName(logEntry.getKey()
+ .getLogSeqNum()));
}
static String formatRecoveredEditsFileName(final long seqid) {
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java?rev=1188419&r1=1188418&r2=1188419&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java Mon Oct 24 22:30:31 2011
@@ -179,6 +179,30 @@ public class HLogKey implements Writable
return result;
}
+ /**
+ * Drop this instance's tablename byte array and instead hold a reference to
+ * the provided tablename. This is not meant to be a general purpose setter -
+ * it's only used to collapse references to conserve memory.
+ */
+ void internTableName(byte[] tablename) {
+ // We should not use this as a setter - only to swap
+ // in a new reference to the same table name.
+ assert Bytes.equals(tablename, this.tablename);
+ this.tablename = tablename;
+ }
+
+ /**
+ * Drop this instance's region name byte array and instead hold a reference to
+ * the provided region name. This is not meant to be a general purpose setter
+ * - it's only used to collapse references to conserve memory.
+ */
+ void internEncodedRegionName(byte[] encodedRegionName) {
+ // We should not use this as a setter - only to swap
+ // in a new reference to the same table name.
+ assert Bytes.equals(this.regionName, encodedRegionName);
+ this.regionName = encodedRegionName;
+ }
+
public void write(DataOutput out) throws IOException {
Bytes.writeByteArray(out, this.regionName);
Bytes.writeByteArray(out, this.tablename);
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1188419&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java Mon Oct 24 22:30:31 2011
@@ -0,0 +1,555 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import static org.apache.hadoop.hbase.util.FSUtils.recoverFileLease;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
+import org.apache.hadoop.io.MultipleIOException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * This class is responsible for splitting up a bunch of regionserver commit log
+ * files that are no longer being written to, into new files, one per region for
+ * region to replay on startup. Delete the old log files when finished.
+ */
+public class HLogSplitter {
+
+ private static final String LOG_SPLITTER_IMPL = "hbase.hlog.splitter.impl";
+
+
+ static final Log LOG = LogFactory.getLog(HLogSplitter.class);
+
+
+ // Parameters for split process
+ protected final Path rootDir;
+ protected final Path srcDir;
+ protected final Path oldLogDir;
+ protected final FileSystem fs;
+ protected final Configuration conf;
+
+ // If an exception is thrown by one of the other threads, it will be
+ // stored here.
+ protected AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
+
+ // Wait/notify for when data has been produced by the reader thread,
+ // consumed by the reader thread, or an exception occurred
+ Object dataAvailable = new Object();
+
+ private MonitoredTask status;
+
+
+ /**
+ * Create a new HLogSplitter using the given {@link Configuration} and the
+ * <code>hbase.hlog.splitter.impl</code> property to derived the instance
+ * class to use.
+ *
+ * @param rootDir hbase directory
+ * @param srcDir logs directory
+ * @param oldLogDir directory where processed logs are archived to
+ * @param logfiles the list of log files to split
+ */
+ public static HLogSplitter createLogSplitter(Configuration conf,
+ final Path rootDir, final Path srcDir,
+ Path oldLogDir, final FileSystem fs) {
+
+ @SuppressWarnings("unchecked")
+ Class<? extends HLogSplitter> splitterClass = (Class<? extends HLogSplitter>) conf
+ .getClass(LOG_SPLITTER_IMPL, HLogSplitter.class);
+ try {
+ Constructor<? extends HLogSplitter> constructor =
+ splitterClass.getConstructor(
+ Configuration.class, // conf
+ Path.class, // rootDir
+ Path.class, // srcDir
+ Path.class, // oldLogDir
+ FileSystem.class); // fs
+ return constructor.newInstance(conf, rootDir, srcDir, oldLogDir, fs);
+ } catch (IllegalArgumentException e) {
+ throw new RuntimeException(e);
+ } catch (InstantiationException e) {
+ throw new RuntimeException(e);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ } catch (InvocationTargetException e) {
+ throw new RuntimeException(e);
+ } catch (SecurityException e) {
+ throw new RuntimeException(e);
+ } catch (NoSuchMethodException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public HLogSplitter(Configuration conf, Path rootDir, Path srcDir,
+ Path oldLogDir, FileSystem fs) {
+ this.conf = conf;
+ this.rootDir = rootDir;
+ this.srcDir = srcDir;
+ this.oldLogDir = oldLogDir;
+ this.fs = fs;
+ }
+
+ /**
+ * Splits a HLog file into a temporary staging area. tmpname is used to build
+ * the name of the staging area where the recovered-edits will be separated
+ * out by region and stored.
+ * <p>
+ * If the log file has N regions then N recovered.edits files will be
+ * produced. There is no buffering in this code. Instead it relies on the
+ * buffering in the SequenceFileWriter.
+ * <p>
+ * @param rootDir
+ * @param tmpname
+ * @param logfile
+ * @param fs
+ * @param conf
+ * @param reporter
+ * @return false if it is interrupted by the progress-able.
+ * @throws IOException
+ */
+ static public boolean splitLogFileToTemp(Path rootDir, String tmpname,
+ FileStatus logfile, FileSystem fs,
+ Configuration conf, CancelableProgressable reporter) throws IOException {
+ HLogSplitter s = new HLogSplitter(conf, rootDir, null, null /* oldLogDir */,
+ fs);
+ return s.splitLogFileToTemp(logfile, tmpname, reporter);
+ }
+
+ public boolean splitLogFileToTemp(FileStatus logfile, String tmpname,
+ CancelableProgressable reporter) throws IOException {
+ final Map<byte[], Object> logWriters = Collections.
+ synchronizedMap(new TreeMap<byte[], Object>(Bytes.BYTES_COMPARATOR));
+ boolean isCorrupted = false;
+
+ Preconditions.checkState(status == null);
+ status = TaskMonitor.get().createStatus(
+ "Splitting log file " + logfile.getPath() +
+ "into a temporary staging area.");
+
+ Object BAD_WRITER = new Object();
+
+ boolean progress_failed = false;
+
+ boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", true);
+ int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
+ // How often to send a progress report (default 1/2 master timeout)
+ int period = conf.getInt("hbase.splitlog.report.period",
+ conf.getInt("hbase.splitlog.manager.timeout",
+ ZKSplitLog.DEFAULT_TIMEOUT) / 2);
+ Path logPath = logfile.getPath();
+ long logLength = logfile.getLen();
+ LOG.info("Splitting hlog: " + logPath + ", length=" + logLength);
+ status.setStatus("Opening log file");
+ Reader in = null;
+ try {
+ in = getReader(fs, logfile, conf, skipErrors);
+ } catch (CorruptedLogFileException e) {
+ LOG.warn("Could not get reader, corrupted log file " + logPath, e);
+ ZKSplitLog.markCorrupted(rootDir, tmpname, fs);
+ isCorrupted = true;
+ }
+ if (in == null) {
+ status.markComplete("Was nothing to split in log file");
+ LOG.warn("Nothing to split in log file " + logPath);
+ return true;
+ }
+ long t = EnvironmentEdgeManager.currentTimeMillis();
+ long last_report_at = t;
+ if (reporter != null && reporter.progress() == false) {
+ status.markComplete("Failed: reporter.progress asked us to terminate");
+ return false;
+ }
+ int editsCount = 0;
+ Entry entry;
+ try {
+ while ((entry = getNextLogLine(in,logPath, skipErrors)) != null) {
+ byte[] region = entry.getKey().getRegionName();
+ Object o = logWriters.get(region);
+ if (o == BAD_WRITER) {
+ continue;
+ }
+ WriterAndPath wap = (WriterAndPath)o;
+ if (wap == null) {
+ wap = createWAP(region, entry, rootDir, tmpname, fs, conf);
+ if (wap == null) {
+ // ignore edits from this region. It doesn't exist anymore.
+ // It was probably already split.
+ logWriters.put(region, BAD_WRITER);
+ continue;
+ } else {
+ logWriters.put(region, wap);
+ }
+ }
+ wap.w.append(entry);
+ editsCount++;
+ if (editsCount % interval == 0) {
+ status.setStatus("Split " + editsCount + " edits");
+ long t1 = EnvironmentEdgeManager.currentTimeMillis();
+ if ((t1 - last_report_at) > period) {
+ last_report_at = t;
+ if (reporter != null && reporter.progress() == false) {
+ status.setStatus("Failed: reporter.progress asked us to terminate");
+ progress_failed = true;
+ return false;
+ }
+ }
+ }
+ }
+ } catch (CorruptedLogFileException e) {
+ LOG.warn("Could not parse, corrupted log file " + logPath, e);
+ ZKSplitLog.markCorrupted(rootDir, tmpname, fs);
+ isCorrupted = true;
+ } catch (IOException e) {
+ e = RemoteExceptionHandler.checkIOException(e);
+ throw e;
+ } finally {
+ int n = 0;
+ for (Object o : logWriters.values()) {
+ long t1 = EnvironmentEdgeManager.currentTimeMillis();
+ if ((t1 - last_report_at) > period) {
+ last_report_at = t;
+ if ((progress_failed == false) && (reporter != null) &&
+ (reporter.progress() == false)) {
+ progress_failed = true;
+ }
+ }
+ if (o == BAD_WRITER) {
+ continue;
+ }
+ n++;
+ WriterAndPath wap = (WriterAndPath)o;
+ wap.w.close();
+ LOG.debug("Closed " + wap.p);
+ }
+ String msg = "processed " + editsCount + " edits across " + n +
+ " regions" + " threw away edits for " + (logWriters.size() - n) +
+ " regions" + " log file = " + logPath + " is corrupted = " +
+ isCorrupted + " progress interrupted? = " + progress_failed;
+ LOG.info(msg);
+ status.markComplete(msg);
+ }
+ if (progress_failed) {
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Completes the work done by splitLogFileToTemp by moving the
+ * recovered.edits from the staging area to the respective region server's
+ * directories.
+ * <p>
+ * It is invoked by SplitLogManager once it knows that one of the
+ * SplitLogWorkers have completed the splitLogFileToTemp() part. If the
+ * master crashes then this function might get called multiple times.
+ * <p>
+ * @param tmpname
+ * @param conf
+ * @throws IOException
+ */
+ public static void moveRecoveredEditsFromTemp(String tmpname,
+ String logfile, Configuration conf)
+ throws IOException{
+ Path rootdir = FSUtils.getRootDir(conf);
+ Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
+ moveRecoveredEditsFromTemp(tmpname, rootdir, oldLogDir, logfile, conf);
+ }
+
+ public static void moveRecoveredEditsFromTemp(String tmpname,
+ Path rootdir, Path oldLogDir,
+ String logfile, Configuration conf)
+ throws IOException{
+ List<Path> processedLogs = new ArrayList<Path>();
+ List<Path> corruptedLogs = new ArrayList<Path>();
+ FileSystem fs;
+ fs = rootdir.getFileSystem(conf);
+ Path logPath = new Path(logfile);
+ if (ZKSplitLog.isCorrupted(rootdir, tmpname, fs)) {
+ corruptedLogs.add(logPath);
+ } else {
+ processedLogs.add(logPath);
+ }
+ Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, tmpname);
+ List<FileStatus> files = listAll(fs, stagingDir);
+ for (FileStatus f : files) {
+ Path src = f.getPath();
+ Path dst = ZKSplitLog.stripSplitLogTempDir(rootdir, src);
+ if (ZKSplitLog.isCorruptFlagFile(dst)) {
+ continue;
+ }
+ if (fs.exists(dst)) {
+ fs.delete(dst, false);
+ } else {
+ Path dstdir = dst.getParent();
+ if (!fs.exists(dstdir)) {
+ if (!fs.mkdirs(dstdir)) LOG.warn("mkdir failed on " + dstdir);
+ }
+ }
+ fs.rename(src, dst);
+ LOG.debug(" moved " + src + " => " + dst);
+ }
+ HLog.archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf);
+ fs.delete(stagingDir, true);
+ return;
+ }
+
+ private static List<FileStatus> listAll(FileSystem fs, Path dir)
+ throws IOException {
+ List<FileStatus> fset = new ArrayList<FileStatus>(100);
+ FileStatus [] files = fs.listStatus(dir);
+ if (files != null) {
+ for (FileStatus f : files) {
+ if (f.isDir()) {
+ fset.addAll(listAll(fs, f.getPath()));
+ } else {
+ fset.add(f);
+ }
+ }
+ }
+ return fset;
+ }
+
+ /**
+ * Create a new {@link Reader} for reading logs to split.
+ *
+ * @param fs
+ * @param file
+ * @param conf
+ * @return A new Reader instance
+ * @throws IOException
+ * @throws CorruptedLogFile
+ */
+ protected Reader getReader(FileSystem fs, FileStatus file, Configuration conf,
+ boolean skipErrors)
+ throws IOException, CorruptedLogFileException {
+ Path path = file.getPath();
+ long length = file.getLen();
+ Reader in;
+
+
+ // Check for possibly empty file. With appends, currently Hadoop reports a
+ // zero length even if the file has been sync'd. Revisit if HDFS-376 or
+ // HDFS-878 is committed.
+ if (length <= 0) {
+ LOG.warn("File " + path + " might be still open, length is 0");
+ }
+
+ try {
+ recoverFileLease(fs, path, conf);
+ try {
+ in = getReader(fs, path, conf);
+ } catch (EOFException e) {
+ if (length <= 0) {
+ // TODO should we ignore an empty, not-last log file if skip.errors
+ // is false? Either way, the caller should decide what to do. E.g.
+ // ignore if this is the last log in sequence.
+ // TODO is this scenario still possible if the log has been
+ // recovered (i.e. closed)
+ LOG.warn("Could not open " + path + " for reading. File is empty", e);
+ return null;
+ } else {
+ // EOFException being ignored
+ return null;
+ }
+ }
+ } catch (IOException e) {
+ if (!skipErrors) {
+ throw e;
+ }
+ CorruptedLogFileException t =
+ new CorruptedLogFileException("skipErrors=true Could not open hlog " +
+ path + " ignoring");
+ t.initCause(e);
+ throw t;
+ }
+ return in;
+ }
+
+ static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors)
+ throws CorruptedLogFileException, IOException {
+ try {
+ return in.next();
+ } catch (EOFException eof) {
+ // truncated files are expected if a RS crashes (see HBASE-2643)
+ LOG.info("EOF from hlog " + path + ". continuing");
+ return null;
+ } catch (IOException e) {
+ // If the IOE resulted from bad file format,
+ // then this problem is idempotent and retrying won't help
+ if (e.getCause() != null &&
+ (e.getCause() instanceof ParseException ||
+ e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) {
+ LOG.warn("Parse exception " + e.getCause().toString() + " from hlog "
+ + path + ". continuing");
+ return null;
+ }
+ if (!skipErrors) {
+ throw e;
+ }
+ CorruptedLogFileException t =
+ new CorruptedLogFileException("skipErrors=true Ignoring exception" +
+ " while parsing hlog " + path + ". Marking as corrupted");
+ t.initCause(e);
+ throw t;
+ }
+ }
+
+
+
+ /**
+ * Create a new {@link Writer} for writing log splits.
+ */
+ protected Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
+ throws IOException {
+ return HLog.createWriter(fs, logfile, conf);
+ }
+
+ /**
+ * Create a new {@link Reader} for reading logs to split.
+ */
+ protected Reader getReader(FileSystem fs, Path curLogFile, Configuration conf)
+ throws IOException {
+ return HLog.getReader(fs, curLogFile, conf);
+ }
+
+
+ private WriterAndPath createWAP(byte[] region, Entry entry,
+ Path rootdir, String tmpname, FileSystem fs, Configuration conf)
+ throws IOException {
+ Path regionedits = HLog.getRegionSplitEditsPath(fs, entry, rootdir,
+ tmpname == null);
+ if (regionedits == null) {
+ return null;
+ }
+ if ((tmpname == null) && fs.exists(regionedits)) {
+ LOG.warn("Found existing old edits file. It could be the "
+ + "result of a previous failed split attempt. Deleting "
+ + regionedits + ", length="
+ + fs.getFileStatus(regionedits).getLen());
+ if (!fs.delete(regionedits, false)) {
+ LOG.warn("Failed delete of old " + regionedits);
+ }
+ }
+ Path editsfile;
+ if (tmpname != null) {
+ // During distributed log splitting the output by each
+ // SplitLogWorker is written to a temporary area.
+ editsfile = convertRegionEditsToTemp(rootdir, regionedits, tmpname);
+ } else {
+ editsfile = regionedits;
+ }
+ Writer w = createWriter(fs, editsfile, conf);
+ LOG.debug("Creating writer path=" + editsfile + " region="
+ + Bytes.toStringBinary(region));
+ return (new WriterAndPath(editsfile, w));
+ }
+
+ Path convertRegionEditsToTemp(Path rootdir, Path edits, String tmpname) {
+ List<String> components = new ArrayList<String>(10);
+ do {
+ components.add(edits.getName());
+ edits = edits.getParent();
+ } while (edits.depth() > rootdir.depth());
+ Path ret = ZKSplitLog.getSplitLogDir(rootdir, tmpname);
+ for (int i = components.size() - 1; i >= 0; i--) {
+ ret = new Path(ret, components.get(i));
+ }
+ try {
+ if (fs.exists(ret)) {
+ LOG.warn("Found existing old temporary edits file. It could be the "
+ + "result of a previous failed split attempt. Deleting "
+ + ret + ", length="
+ + fs.getFileStatus(ret).getLen());
+ if (!fs.delete(ret, false)) {
+ LOG.warn("Failed delete of old " + ret);
+ }
+ }
+ Path dir = ret.getParent();
+ if (!fs.exists(dir)) {
+ if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
+ }
+ } catch (IOException e) {
+ LOG.warn("Could not prepare temp staging area ", e);
+ // ignore, exceptions will be thrown elsewhere
+ }
+ return ret;
+ }
+
+
+
+
+ /**
+ * Private data structure that wraps a Writer and its Path,
+ * also collecting statistics about the data written to this
+ * output.
+ */
+ private final static class WriterAndPath {
+ final Path p;
+ final Writer w;
+
+ WriterAndPath(final Path p, final Writer w) {
+ this.p = p;
+ this.w = w;
+ }
+ }
+
+ static class CorruptedLogFileException extends Exception {
+ private static final long serialVersionUID = 1L;
+ CorruptedLogFileException(String s) {
+ super(s);
+ }
+ }
+}
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/OrphanHLogAfterSplitException.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/OrphanHLogAfterSplitException.java?rev=1188419&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/OrphanHLogAfterSplitException.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/OrphanHLogAfterSplitException.java Mon Oct 24 22:30:31 2011
@@ -0,0 +1,40 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.IOException;
+
+public class OrphanHLogAfterSplitException extends IOException {
+
+ /**
+ * Create this exception without a message
+ */
+ public OrphanHLogAfterSplitException() {
+ super();
+ }
+
+ /**
+ * Create this exception with a message
+ * @param message why it failed
+ */
+ public OrphanHLogAfterSplitException(String message) {
+ super(message);
+ }
+}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java?rev=1188419&r1=1188418&r2=1188419&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java Mon Oct 24 22:30:31 2011
@@ -28,6 +28,7 @@ import java.util.NavigableMap;
import java.util.TreeMap;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.io.Writable;
@@ -66,7 +67,7 @@ import org.apache.hadoop.io.Writable;
* is an old style KeyValue or the new style WALEdit.
*
*/
-public class WALEdit implements Writable {
+public class WALEdit implements Writable, HeapSize {
private final int VERSION_2 = -1;
@@ -172,4 +173,17 @@ public class WALEdit implements Writable
return sb.toString();
}
+ @Override
+ public long heapSize() {
+ long ret = 0;
+ for (KeyValue kv : kvs) {
+ ret += kv.heapSize();
+ }
+ if (scopes != null) {
+ ret += ClassSize.TREEMAP;
+ ret += ClassSize.align(scopes.size() * ClassSize.MAP_ENTRY);
+ }
+ return ret;
+ }
+
}
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/CancelableProgressable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/CancelableProgressable.java?rev=1188419&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/CancelableProgressable.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/CancelableProgressable.java Mon Oct 24 22:30:31 2011
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+/**
+ * Similar interface as {@link org.apache.hadoop.util.Progressable} but returns
+ * a boolean to support canceling the operation.
+ * <p>
+ * Used for doing updating of OPENING znode during log replay on region open.
+ */
+public interface CancelableProgressable {
+
+ /**
+ * Report progress. Returns true if operations should continue, false if the
+ * operation should be canceled and rolled back.
+ * @return whether to continue (true) or cancel (false) the operation
+ */
+ public boolean progress();
+
+}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java?rev=1188419&r1=1188418&r2=1188419&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java Mon Oct 24 22:30:31 2011
@@ -1,25 +1,23 @@
package org.apache.hadoop.hbase.zookeeper;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.RetryCounterFactory;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
-import org.apache.zookeeper.ZooKeeper.States;
-
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
/**
* http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling
@@ -396,6 +394,13 @@ public class RecoverableZooKeeper {
}
}
+ public void asyncCreate(String path, byte[] data, List<ACL> acl,
+ CreateMode createMode, final AsyncCallback.StringCallback cb,
+ final Object ctx) {
+ byte[] newData = appendMetaData(data);
+ zk.create(path, newData, acl, createMode, cb, ctx);
+ }
+
/**
* <p>
* NONSEQUENTIAL create is idempotent operation.
@@ -423,11 +428,11 @@ public class RecoverableZooKeeper {
switch (createMode) {
case EPHEMERAL:
case PERSISTENT:
- return createNonSequential(path, newData, acl, createMode);
+ return createNonSequential(path, newData, acl, createMode);
case EPHEMERAL_SEQUENTIAL:
case PERSISTENT_SEQUENTIAL:
- return createSequential(path, newData, acl, createMode);
+ return createSequential(path, newData, acl, createMode);
default:
throw new IllegalArgumentException("Unrecognized CreateMode: " +
@@ -541,7 +546,7 @@ public class RecoverableZooKeeper {
return null;
}
- public byte[] removeMetaData(byte[] data) {
+ public static byte[] removeMetaData(byte[] data) {
if(data == null || data.length == 0) {
return data;
}
@@ -625,4 +630,9 @@ public class RecoverableZooKeeper {
return lockChildren;
}
+ @Override
+ public String toString() {
+ return "RZK{identifier=>" + identifier + "}";
+ }
+
}
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java?rev=1188419&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java Mon Oct 24 22:30:31 2011
@@ -0,0 +1,273 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Field;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.master.SplitLogManager;
+import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Common methods and attributes used by {@link SplitLogManager} and
+ * {@link SplitLogWorker}
+ */
+public class ZKSplitLog {
+ private static final Log LOG = LogFactory.getLog(ZKSplitLog.class);
+
+ public static final int DEFAULT_TIMEOUT = 25000; // 25 sec
+ public static final int DEFAULT_ZK_RETRIES = 3;
+ public static final int DEFAULT_MAX_RESUBMIT = 3;
+ public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); //3 min
+
+ /**
+ * Gets the full path node name for the log file being split
+ * @param zkw zk reference
+ * @param filename log file name (only the basename)
+ */
+ public static String getEncodedNodeName(ZooKeeperWrapper zkw,
+ String filename) {
+ return zkw.getZNode(zkw.splitLogZNode, encode(filename));
+ }
+
+ public static String getFileName(String node) {
+ String basename = node.substring(node.lastIndexOf('/') + 1);
+ return decode(basename);
+ }
+
+
+ public static String encode(String s) {
+ try {
+ return URLEncoder.encode(s, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException("URLENCODER doesn't support UTF-8");
+ }
+ }
+
+ public static String decode(String s) {
+ try {
+ return URLDecoder.decode(s, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException("URLDecoder doesn't support UTF-8");
+ }
+ }
+
+ public static String getRescanNode(ZooKeeperWrapper zkw) {
+ return zkw.getZNode(zkw.splitLogZNode, "RESCAN");
+ }
+
+ public static boolean isRescanNode(ZooKeeperWrapper zkw, String path) {
+ String prefix = getRescanNode(zkw);
+ if (path.length() < prefix.length()) {
+ return false;
+ }
+ for (int i = 0; i < prefix.length(); i++) {
+ if (prefix.charAt(i) != path.charAt(i)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public static boolean isTaskPath(ZooKeeperWrapper zkw, String path) {
+ String dirname = path.substring(0, path.lastIndexOf('/'));
+ return dirname.equals(zkw.splitLogZNode);
+ }
+
+ public static enum TaskState {
+ TASK_UNASSIGNED("unassigned"),
+ TASK_OWNED("owned"),
+ TASK_RESIGNED("resigned"),
+ TASK_DONE("done"),
+ TASK_ERR("err");
+
+ private final byte[] state;
+ private TaskState(String s) {
+ state = s.getBytes();
+ }
+
+ public byte[] get(String serverName) {
+ return (Bytes.add(state, " ".getBytes(), serverName.getBytes()));
+ }
+
+ public String getWriterName(byte[] data) {
+ String str = Bytes.toString(data);
+ return str.substring(str.indexOf(' ') + 1);
+ }
+
+
+ /**
+ * @param s
+ * @return True if {@link #state} is a prefix of s. False otherwise.
+ */
+ public boolean equals(byte[] s) {
+ if (s.length < state.length) {
+ return (false);
+ }
+ for (int i = 0; i < state.length; i++) {
+ if (state[i] != s[i]) {
+ return (false);
+ }
+ }
+ return (true);
+ }
+
+ public boolean equals(byte[] s, String serverName) {
+ return (Arrays.equals(s, get(serverName)));
+ }
+ @Override
+ public String toString() {
+ return new String(state);
+ }
+ }
+
+ public static Path getSplitLogDir(Path rootdir, String tmpname) {
+ return new Path(new Path(rootdir, "splitlog"), tmpname);
+ }
+
+ public static Path stripSplitLogTempDir(Path rootdir, Path file) {
+ int skipDepth = rootdir.depth() + 2;
+ List<String> components = new ArrayList<String>(10);
+ do {
+ components.add(file.getName());
+ file = file.getParent();
+ } while (file.depth() > skipDepth);
+ Path ret = rootdir;
+ for (int i = components.size() - 1; i >= 0; i--) {
+ ret = new Path(ret, components.get(i));
+ }
+ return ret;
+ }
+
+ public static String getSplitLogDirTmpComponent(String worker, String file) {
+ return (worker + "_" + ZKSplitLog.encode(file));
+ }
+
+ public static void markCorrupted(Path rootdir, String tmpname, FileSystem fs)
+ throws IOException {
+ Path file = new Path(getSplitLogDir(rootdir, tmpname), "corrupt");
+ try {
+ fs.createNewFile(file);
+ } catch (IOException e) {
+ LOG.error("Could not flag a log file as corrupted. Failed to create "
+ + file);
+ throw e;
+ }
+ }
+
+ public static boolean isCorrupted(Path rootdir, String tmpname,
+ FileSystem fs) throws IOException {
+ Path file = new Path(getSplitLogDir(rootdir, tmpname), "corrupt");
+ boolean isCorrupt;
+ isCorrupt = fs.exists(file);
+ return isCorrupt;
+ }
+
+ public static boolean isCorruptFlagFile(Path file) {
+ return file.getName().equals("corrupt");
+ }
+
+
+ public static class Counters {
+ //SplitLogManager counters
+ public static AtomicLong tot_mgr_log_split_batch_start = new AtomicLong(0);
+ public static AtomicLong tot_mgr_log_split_batch_success =
+ new AtomicLong(0);
+ public static AtomicLong tot_mgr_log_split_batch_err = new AtomicLong(0);
+ public static AtomicLong tot_mgr_new_unexpected_hlogs = new AtomicLong(0);
+ public static AtomicLong tot_mgr_log_split_start = new AtomicLong(0);
+ public static AtomicLong tot_mgr_log_split_success = new AtomicLong(0);
+ public static AtomicLong tot_mgr_log_split_err = new AtomicLong(0);
+ public static AtomicLong tot_mgr_node_create_queued = new AtomicLong(0);
+ public static AtomicLong tot_mgr_node_create_result = new AtomicLong(0);
+ public static AtomicLong tot_mgr_node_already_exists = new AtomicLong(0);
+ public static AtomicLong tot_mgr_node_create_err = new AtomicLong(0);
+ public static AtomicLong tot_mgr_node_create_retry = new AtomicLong(0);
+ public static AtomicLong tot_mgr_get_data_queued = new AtomicLong(0);
+ public static AtomicLong tot_mgr_get_data_result = new AtomicLong(0);
+ public static AtomicLong tot_mgr_get_data_err = new AtomicLong(0);
+ public static AtomicLong tot_mgr_get_data_retry = new AtomicLong(0);
+ public static AtomicLong tot_mgr_node_delete_queued = new AtomicLong(0);
+ public static AtomicLong tot_mgr_node_delete_result = new AtomicLong(0);
+ public static AtomicLong tot_mgr_node_delete_err = new AtomicLong(0);
+ public static AtomicLong tot_mgr_resubmit = new AtomicLong(0);
+ public static AtomicLong tot_mgr_resubmit_failed = new AtomicLong(0);
+ public static AtomicLong tot_mgr_null_data = new AtomicLong(0);
+ public static AtomicLong tot_mgr_orphan_task_acquired = new AtomicLong(0);
+ public static AtomicLong tot_mgr_unacquired_orphan_done = new AtomicLong(0);
+ public static AtomicLong tot_mgr_resubmit_threshold_reached =
+ new AtomicLong(0);
+ public static AtomicLong tot_mgr_missing_state_in_delete =
+ new AtomicLong(0);
+ public static AtomicLong tot_mgr_heartbeat = new AtomicLong(0);
+ public static AtomicLong tot_mgr_rescan = new AtomicLong(0);
+ public static AtomicLong tot_mgr_rescan_deleted = new AtomicLong(0);
+ public static AtomicLong tot_mgr_task_deleted = new AtomicLong(0);
+ public static AtomicLong tot_mgr_resubmit_unassigned = new AtomicLong(0);
+ public static AtomicLong tot_mgr_relist_logdir = new AtomicLong(0);
+ public static AtomicLong tot_mgr_resubmit_dead_server_task =
+ new AtomicLong(0);
+
+
+
+ // SplitLogWorker counters
+ public static AtomicLong tot_wkr_failed_to_grab_task_no_data =
+ new AtomicLong(0);
+ public static AtomicLong tot_wkr_failed_to_grab_task_exception =
+ new AtomicLong(0);
+ public static AtomicLong tot_wkr_failed_to_grab_task_owned =
+ new AtomicLong(0);
+ public static AtomicLong tot_wkr_failed_to_grab_task_lost_race =
+ new AtomicLong(0);
+ public static AtomicLong tot_wkr_task_acquired = new AtomicLong(0);
+ public static AtomicLong tot_wkr_task_resigned = new AtomicLong(0);
+ public static AtomicLong tot_wkr_task_done = new AtomicLong(0);
+ public static AtomicLong tot_wkr_task_err = new AtomicLong(0);
+ public static AtomicLong tot_wkr_task_heartbeat = new AtomicLong(0);
+ public static AtomicLong tot_wkr_task_acquired_rescan = new AtomicLong(0);
+ public static AtomicLong tot_wkr_get_data_queued = new AtomicLong(0);
+ public static AtomicLong tot_wkr_get_data_result = new AtomicLong(0);
+ public static AtomicLong tot_wkr_get_data_retry = new AtomicLong(0);
+ public static AtomicLong tot_wkr_preempt_task = new AtomicLong(0);
+ public static AtomicLong tot_wkr_task_heartbeat_failed = new AtomicLong(0);
+ public static AtomicLong tot_wkr_final_transistion_failed =
+ new AtomicLong(0);
+
+ public static void resetCounters() throws Exception {
+ Class<?> cl = (new Counters()).getClass();
+ Field[] flds = cl.getDeclaredFields();
+ for (Field fld : flds) {
+ ((AtomicLong)fld.get(null)).set(0);
+ }
+ }
+ }
+}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java?rev=1188419&r1=1188418&r2=1188419&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java Mon Oct 24 22:30:31 2011
@@ -20,6 +20,8 @@
package org.apache.hadoop.hbase.zookeeper;
import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.InterruptedIOException;
@@ -29,7 +31,10 @@ import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
+import java.util.Calendar;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -39,9 +44,6 @@ import java.util.Properties;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.Calendar;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -52,14 +54,16 @@ import org.apache.hadoop.hbase.HServerIn
import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType;
import org.apache.hadoop.hbase.executor.RegionTransitionEventData;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.data.Stat;
@@ -141,6 +145,10 @@ public class ZooKeeperWrapper implements
*/
private final String rgnsInTransitZNode;
/*
+ * ZNode used for log splitting work assignment
+ */
+ public final String splitLogZNode;
+ /*
* List of ZNodes in the unassgined region that are already being watched
*/
private Set<String> unassignedZNodesWatched = new HashSet<String>();
@@ -201,6 +209,7 @@ public class ZooKeeperWrapper implements
String masterAddressZNodeName = conf.get("zookeeper.znode.master", "master");
String stateZNodeName = conf.get("zookeeper.znode.state", "shutdown");
String regionsInTransitZNodeName = conf.get("zookeeper.znode.regionInTransition", "UNASSIGNED");
+ String splitLogZNodeName = conf.get("zookeeper.znode.splitlog", "splitlog");
rootRegionZNode = getZNode(parentZNode, rootServerZNodeName);
rsZNode = getZNode(parentZNode, rsZNodeName);
@@ -209,8 +218,9 @@ public class ZooKeeperWrapper implements
clusterStateZNode = getZNode(parentZNode, stateZNodeName);
int retryNum = conf.getInt("zookeeper.connection.retry.num", 6);
int retryFreq = conf.getInt("zookeeper.connection.retry.freq", 1000);
- zkDumpConnectionTimeOut = conf.getInt("zookeeper.dump.connection.timeout",
+ zkDumpConnectionTimeOut = conf.getInt("zookeeper.dump.connection.timeout",
1000);
+ splitLogZNode = getZNode(parentZNode, splitLogZNodeName);
connectToZk(retryNum,retryFreq);
}
@@ -1002,22 +1012,33 @@ public class ZooKeeperWrapper implements
}
public byte[] getData(String parentZNode, String znode) {
- return getDataAndWatch(parentZNode, znode, null);
+ return getData(parentZNode, znode, null);
+ }
+
+ public byte[] getData(String parentZNode, String znode, Stat stat) {
+ return getDataAndWatch(parentZNode, znode, null, stat);
}
- public byte[] getDataAndWatch(String parentZNode,
- String znode, Watcher watcher) {
+ public byte[] getDataAndWatch(String parentZNode, String znode,
+ Watcher watcher) {
+ return getDataAndWatch(parentZNode, znode, watcher, null);
+ }
+
+ public byte[] getDataAndWatch(String parentZNode, String znode,
+ Watcher watcher, Stat stat) {
byte[] data = null;
try {
- String path = joinPath(parentZNode, znode);
- // TODO: ZK-REFACTOR: remove existance check?
+ String path = getZNode(parentZNode, znode);
+ // TODO: ZK-REFACTOR: remove existence check?
if (checkExistenceOf(path)) {
- data = recoverableZK.getData(path, watcher, null);
+ data = recoverableZK.getData(path, watcher, stat);
}
} catch (KeeperException e) {
- LOG.warn("<" + instanceName + ">" + "Failed to read " + znode + " znode in ZooKeeper: " + e);
+ LOG.warn("<" + instanceName + ">" + "Failed to read " + znode
+ + " znode in ZooKeeper: " + e);
} catch (InterruptedException e) {
- LOG.warn("<" + instanceName + ">" + "Failed to read " + znode + " znode in ZooKeeper: " + e);
+ LOG.warn("<" + instanceName + ">" + "Failed to read " + znode
+ + " znode in ZooKeeper: " + e);
}
return data;
}
@@ -1164,10 +1185,10 @@ public class ZooKeeperWrapper implements
LOG.error("<" + instanceName + ">" + "Failed to create ZNode " + fullyQualifiedZNodeName + " in ZooKeeper", e);
return null;
}
- // watch the znode for deletion, data change, creation of children
- if(watch) {
- watchZNode(zNodeName);
- }
+ // watch the znode for deletion, data change, creation of children
+ if (watch) {
+ watchZNode(zNodeName);
+ }
return fullyQualifiedZNodeName;
}
@@ -1371,6 +1392,289 @@ public class ZooKeeperWrapper implements
return newNodes;
}
+ /**
+ * Check if the specified node exists. Sets no watches.
+ *
+ * Returns true if node exists, false if not. Returns an exception if there
+ * is an unexpected zookeeper exception.
+ *
+ * @param znode path of node to watch
+ * @return version of the node if it exists, -1 if does not exist
+ * @throws KeeperException if unexpected zookeeper exception
+ */
+ public int checkExists(String znode)
+ throws KeeperException {
+ try {
+ Stat s = recoverableZK.exists(znode, null);
+ return s != null ? s.getVersion() : -1;
+ } catch (KeeperException e) {
+ LOG.warn(recoverableZK + " Unable to set watcher on znode (" + znode + ")", e);
+ keeperException(e);
+ return -1;
+ } catch (InterruptedException e) {
+ LOG.warn(recoverableZK + " Unable to set watcher on znode (" + znode + ")", e);
+ interruptedException(e);
+ return -1;
+ }
+ }
+
+ /**
+ * Watch the specified znode for delete/create/change events. The watcher is
+ * set whether or not the node exists. If the node already exists, the method
+ * returns true. If the node does not exist, the method returns false.
+ *
+ * @param znode path of node to watch
+ * @return true if znode exists, false if does not exist or error
+ * @throws KeeperException if unexpected zookeeper exception
+ */
+ public boolean watchAndCheckExists(String znode)
+ throws KeeperException {
+ try {
+ Stat s = recoverableZK.exists(znode, this);
+ LOG.debug(this + " Set watcher on existing znode " + znode);
+ return s != null;
+ } catch (KeeperException e) {
+ LOG.warn(this + " Unable to set watcher on znode " + znode, e);
+ keeperException(e);
+ return false;
+ } catch (InterruptedException e) {
+ LOG.warn(this + " Unable to set watcher on znode " + znode, e);
+ interruptedException(e);
+ return false;
+ }
+ }
+
+ /**
+ * Lists the children of the specified znode without setting any watches.
+ *
+ * Used to list the currently online regionservers and their addresses.
+ *
+ * Sets no watches at all, this method is best effort.
+ *
+ * Returns an empty list if the node has no children. Returns null if the
+ * parent node itself does not exist.
+ *
+ * @param znode node to get children of as addresses
+ * @return list of data of children of specified znode, empty if no children,
+ * null if parent does not exist
+ * @throws KeeperException if unexpected zookeeper exception
+ */
+ public List<String> listChildrenNoWatch(String znode)
+ throws KeeperException {
+ List<String> children = null;
+ try {
+ // List the children without watching
+ children = recoverableZK.getChildren(znode, null);
+ } catch(KeeperException.NoNodeException nne) {
+ return null;
+ } catch(InterruptedException ie) {
+ interruptedException(ie);
+ }
+ return children;
+ }
+
+ /**
+ * Lists the children znodes of the specified znode. Also sets a watch on
+ * the specified znode which will capture a NodeDeleted event on the specified
+ * znode as well as NodeChildrenChanged if any children of the specified znode
+ * are created or deleted.
+ *
+ * Returns null if the specified node does not exist. Otherwise returns a
+ * list of children of the specified node. If the node exists but it has no
+ * children, an empty list will be returned.
+ *
+ * @param znode path of node to list and watch children of
+ * @return list of children of the specified node, an empty list if the node
+ * exists but has no children, and null if the node does not exist
+ * @throws KeeperException if unexpected zookeeper exception
+ */
+ public List<String> listChildrenAndWatchForNewChildren(String znode)
+ throws KeeperException {
+ try {
+ List<String> children = recoverableZK.getChildren(znode, this);
+ return children;
+ } catch(KeeperException.NoNodeException ke) {
+ LOG.debug(recoverableZK + " Unable to list children of znode " + znode +
+ " because node does not exist (not an error)");
+ return null;
+ } catch (KeeperException e) {
+ LOG.warn(recoverableZK + " Unable to list children of znode " + znode, e);
+ keeperException(e);
+ return null;
+ } catch (InterruptedException e) {
+ LOG.warn(recoverableZK + " Unable to list children of znode " + znode, e);
+ interruptedException(e);
+ return null;
+ }
+ }
+
+ /**
+ * Sets the data of the existing znode to be the specified data. The node
+ * must exist but no checks are done on the existing data or version.
+ *
+ * <p>If the node does not exist, a {@link NoNodeException} will be thrown.
+ *
+ * <p>No watches are set but setting data will trigger other watchers of this
+ * node.
+ *
+ * <p>If there is another problem, a KeeperException will be thrown.
+ *
+ * @param znode path of node
+ * @param data data to set for node
+ * @throws KeeperException if unexpected zookeeper exception
+ */
+ public boolean setData(String znode, byte[] data)
+ throws KeeperException, KeeperException.NoNodeException {
+ return setData(znode, data, -1);
+ }
+
+ /**
+ * Sets the data of the existing znode to be the specified data. Ensures that
+ * the current data has the specified expected version.
+ *
+ * <p>If the node does not exist, a {@link NoNodeException} will be thrown.
+ *
+ * <p>If their is a version mismatch, method returns null.
+ *
+ * <p>No watches are set but setting data will trigger other watchers of this
+ * node.
+ *
+ * <p>If there is another problem, a KeeperException will be thrown.
+ *
+ * @param znode path of node
+ * @param data data to set for node
+ * @param expectedVersion version expected when setting data
+ * @return true if data set, false if version mismatch
+ * @throws KeeperException if unexpected zookeeper exception
+ */
+ public boolean setData(String znode, byte [] data, int expectedVersion)
+ throws KeeperException, KeeperException.NoNodeException {
+ try {
+ return recoverableZK.setData(znode, data, expectedVersion) != null;
+ } catch (InterruptedException e) {
+ interruptedException(e);
+ return false;
+ }
+ }
+
+ /**
+ * Sets the data of the existing znode to be the specified data. Ensures that
+ * the current data has the specified expected version.
+ *
+ * <p>If the node does not exist, a {@link NoNodeException} will be thrown.
+ *
+ * <p>If their is a version mismatch, method returns null.
+ *
+ * <p>No watches are set but setting data will trigger other watchers of this
+ * node.
+ *
+ * <p>If there is another problem, a KeeperException will be thrown.
+ *
+ * @param znode path of node
+ * @param data data to set for node
+ * @param expectedVersion version expected when setting data
+ * @return stat of which returned by setData
+ * @throws KeeperException if unexpected zookeeper exception
+ */
+ public Stat setDataGetStat(String znode, byte [] data, int expectedVersion)
+ throws KeeperException, KeeperException.NoNodeException, InterruptedException {
+ return recoverableZK.setData(znode, data, expectedVersion);
+ }
+
+ /**
+ * Async creates the specified node with the specified data.
+ *
+ * <p>Throws an exception if the node already exists.
+ *
+ * <p>The node created is persistent and open access.
+ *
+ * @param znode path of node to create
+ * @param data data of node to create
+ * @param cb
+ * @param ctx
+ * @throws KeeperException if unexpected zookeeper exception
+ * @throws KeeperException.NodeExistsException if node already exists
+ */
+ public void asyncCreate(String znode, byte[] data, CreateMode createMode,
+ final AsyncCallback.StringCallback cb, final Object ctx) {
+ recoverableZK.asyncCreate(znode, data, Ids.OPEN_ACL_UNSAFE,
+ createMode, cb, ctx);
+ }
+
+ /**
+ * Delete the specified node and all of it's children.
+ *
+ * Sets no watches. Throws all exceptions besides dealing with deletion of
+ * children.
+ */
+ public void deleteNodeRecursively(String node)
+ throws KeeperException {
+ try {
+ List<String> children = listChildrenNoWatch(node);
+ if (!children.isEmpty()) {
+ for (String child : children) {
+ deleteNodeRecursively(joinPath(node, child));
+ }
+ }
+ recoverableZK.delete(node, -1);
+ } catch(InterruptedException ie) {
+ interruptedException(ie);
+ }
+ }
+
+ /**
+ * Delete all the children of the specified node but not the node itself.
+ *
+ * Sets no watches. Throws all exceptions besides dealing with deletion of
+ * children.
+ */
+ public void deleteChildrenRecursively(String node)
+ throws KeeperException {
+ List<String> children = listChildrenNoWatch(node);
+ if (children != null && !children.isEmpty()) {
+ for (String child : children) {
+ deleteNodeRecursively(joinPath(node, child));
+ }
+ }
+ }
+
+ /**
+ * Handles InterruptedExceptions in client calls.
+ * <p>
+ * This may be temporary but for now this gives one place to deal with these.
+ * <p>
+ * TODO: Currently, this method does nothing. Is this ever expected to happen?
+ * Do we abort or can we let it run? Maybe this should be logged as WARN? It
+ * shouldn't happen?
+ * <p>
+ *
+ * @param ie
+ */
+ public void interruptedException(InterruptedException ie) {
+ LOG.debug(recoverableZK
+ + " Received InterruptedException, doing nothing here", ie);
+ // At least preserver interrupt.
+ Thread.currentThread().interrupt();
+ // no-op
+ }
+
+ /**
+ * Handles KeeperExceptions in client calls.
+ * <p>
+ * This may be temporary but for now this gives one place to deal with these.
+ * <p>
+ * TODO: Currently this method rethrows the exception to let the caller handle
+ * <p>
+ *
+ * @param ke
+ * @throws KeeperException
+ */
+ public void keeperException(KeeperException ke) throws KeeperException {
+ LOG.error(recoverableZK
+ + " Received unexpected KeeperException, re-throwing exception", ke);
+ throw ke;
+ }
+
public static class ZNodePathAndData {
private String zNodePath;
private byte[] data;
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java?rev=1188419&r1=1188418&r2=1188419&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java Mon Oct 24 22:30:31 2011
@@ -418,6 +418,27 @@ public class HBaseTestingUtility {
return createTable(tableName, new byte[][]{family});
}
+
+ public HTable createTable(byte[] tableName, byte[][] families,
+ int numVersions, byte[] startKey, byte[] endKey, int numRegions)
+ throws IOException{
+ HTableDescriptor desc = new HTableDescriptor(tableName);
+ for (byte[] family : families) {
+ HColumnDescriptor hcd = new HColumnDescriptor(family, numVersions,
+ HColumnDescriptor.DEFAULT_COMPRESSION,
+ HColumnDescriptor.DEFAULT_IN_MEMORY,
+ HColumnDescriptor.DEFAULT_BLOCKCACHE,
+ Integer.MAX_VALUE, HColumnDescriptor.DEFAULT_TTL,
+ HColumnDescriptor.DEFAULT_BLOOMFILTER,
+ HColumnDescriptor.DEFAULT_REPLICATION_SCOPE);
+ desc.addFamily(hcd);
+ }
+ (new HBaseAdmin(getConfiguration())).createTable(desc, startKey,
+ endKey, numRegions);
+ return new HTable(getConfiguration(), tableName);
+ }
+
+
/**
* Create a table.
* @param tableName
Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java?rev=1188419&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java Mon Oct 24 22:30:31 2011
@@ -0,0 +1,412 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.OrphanHLogAfterSplitException;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.junit.Before;
+import org.junit.After;
+import org.junit.Test;
+
+public class TestDistributedLogSplitting {
+ private static final Log LOG = LogFactory.getLog(TestDistributedLogSplitting.class);
+ static {
+ Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
+ }
+
+ final int NUM_RS = 6;
+
+ MiniHBaseCluster cluster;
+ HMaster master;
+ Configuration conf;
+ HBaseTestingUtility TEST_UTIL;
+
+ @Before
+ public void before() throws Exception {
+
+ }
+
+ private void startCluster(int num_rs) throws Exception{
+ ZKSplitLog.Counters.resetCounters();
+ LOG.info("Starting cluster");
+ conf = HBaseConfiguration.create();
+ conf.setInt("hbase.regionserver.info.port", -1);
+ conf.setFloat("hbase.regions.slop", (float)100.0); // no load balancing
+ conf.setBoolean("hbase.master.distributed.log.splitting", true);
+ TEST_UTIL = new HBaseTestingUtility(conf);
+ cluster = TEST_UTIL.startMiniCluster(num_rs);
+ int live_rs;
+ while ((live_rs = cluster.getLiveRegionServerThreads().size()) < num_rs) {
+ LOG.info(live_rs + " out of " + num_rs + " started, waiting ...");
+ Thread.sleep(500);
+ }
+ master = cluster.getMaster();
+ while (!master.isActiveMaster() || master.isClosed() || !master.getIsSplitLogAfterStartupDone()) {
+ LOG.info("waiting for master to be ready");
+ Thread.sleep(500);
+ }
+ }
+
+ @After
+ public void after() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testThreeRSAbort() throws Exception {
+ LOG.info("testThreeRSAbort");
+ final int NUM_REGIONS_TO_CREATE = 40;
+ final int NUM_ROWS_PER_REGION = 100;
+
+ startCluster(NUM_RS);
+
+
+ HTable ht = installTable("table", "family", NUM_REGIONS_TO_CREATE);
+ populateDataInTable(NUM_ROWS_PER_REGION, "family");
+
+
+ List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
+ assertEquals(NUM_RS, rsts.size());
+ rsts.get(0).getRegionServer().abort("testing");
+ rsts.get(1).getRegionServer().abort("testing");
+ rsts.get(2).getRegionServer().abort("testing");
+
+ long start = EnvironmentEdgeManager.currentTimeMillis();
+ while (cluster.getLiveRegionServerThreads().size() > (NUM_RS - 3)) {
+ if (EnvironmentEdgeManager.currentTimeMillis() - start > 60000) {
+ assertTrue(false);
+ }
+ Thread.sleep(200);
+ }
+
+ start = EnvironmentEdgeManager.currentTimeMillis();
+ while (getAllOnlineRegions(cluster).size() < (NUM_REGIONS_TO_CREATE + 2)) {
+ if (EnvironmentEdgeManager.currentTimeMillis() - start > 60000) {
+ assertTrue(false);
+ }
+ Thread.sleep(200);
+ }
+
+ assertEquals(NUM_REGIONS_TO_CREATE * NUM_ROWS_PER_REGION,
+ TEST_UTIL.countRows(ht));
+ }
+
+ @Test(expected=OrphanHLogAfterSplitException.class)
+ public void testOrphanLogCreation() throws Exception {
+ LOG.info("testOrphanLogCreation");
+ startCluster(NUM_RS);
+ final SplitLogManager slm = master.getSplitLogManager();
+ final FileSystem fs = master.getFileSystem();
+
+ List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
+ HRegionServer hrs = rsts.get(0).getRegionServer();
+ Path rootdir = FSUtils.getRootDir(conf);
+ final Path logDir = new Path(rootdir,
+ HLog.getHLogDirectoryName(hrs.getServerInfo().getServerName()));
+
+ installTable("table", "family", 40);
+
+ makeHLog(hrs.getLog(), hrs.getOnlineRegions(), "table",
+ 1000, 100);
+
+ new Thread() {
+ public void run() {
+ while (true) {
+ int i = 0;
+ try {
+ while(ZKSplitLog.Counters.tot_mgr_log_split_batch_start.get() ==
+ 0) {
+ Thread.yield();
+ }
+ fs.createNewFile(new Path(logDir, "foo" + i++));
+ } catch (Exception e) {
+ LOG.debug("file creation failed", e);
+ return;
+ }
+ }
+ }
+ }.start();
+ slm.splitLogDistributed(logDir);
+ FileStatus[] files = fs.listStatus(logDir);
+ if (files != null) {
+ for (FileStatus file : files) {
+ LOG.debug("file still there " + file.getPath());
+ }
+ }
+ }
+
+ @Test
+ public void testRecoveredEdits() throws Exception {
+ LOG.info("testRecoveredEdits");
+ startCluster(NUM_RS);
+ final int NUM_LOG_LINES = 1000;
+ final SplitLogManager slm = master.getSplitLogManager();
+ FileSystem fs = master.getFileSystem();
+
+ List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
+ HRegionServer hrs = rsts.get(0).getRegionServer();
+ Path rootdir = FSUtils.getRootDir(conf);
+ final Path logDir = new Path(rootdir,
+ HLog.getHLogDirectoryName(hrs.getServerInfo().getServerName()));
+
+ installTable("table", "family", 40);
+ byte[] table = Bytes.toBytes("table");
+ Collection<HRegion> regions = new LinkedList<HRegion>(hrs.getOnlineRegions());
+ LOG.info("#regions = " + regions.size());
+ Iterator<HRegion> it = regions.iterator();
+ while (it.hasNext()) {
+ HRegion region = it.next();
+ HRegionInfo hri = region.getRegionInfo();
+ if (hri.isMetaRegion() || hri.isRootRegion()) {
+ it.remove();
+ }
+ }
+ makeHLog(hrs.getLog(), regions, "table",
+ NUM_LOG_LINES, 100);
+
+ slm.splitLogDistributed(logDir);
+
+ int count = 0;
+ for (HRegion rgn : regions) {
+
+ Path tdir = HTableDescriptor.getTableDir(rootdir, table);
+ Path editsdir =
+ HLog.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir,
+ rgn.getRegionInfo().getEncodedName()));
+ LOG.debug("checking edits dir " + editsdir);
+ FileStatus[] files = fs.listStatus(editsdir);
+ assertEquals(1, files.length);
+ int c = countHLog(files[0].getPath(), fs, conf);
+ count += c;
+ LOG.info(c + " edits in " + files[0].getPath());
+ }
+ assertEquals(NUM_LOG_LINES, count);
+ }
+
+ @Test
+ public void testWorkerAbort() throws Exception {
+ LOG.info("testWorkerAbort");
+ startCluster(1);
+ final int NUM_LOG_LINES = 10000;
+ final SplitLogManager slm = master.getSplitLogManager();
+ FileSystem fs = master.getFileSystem();
+
+ final List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
+ HRegionServer hrs = rsts.get(0).getRegionServer();
+ Path rootdir = FSUtils.getRootDir(conf);
+ final Path logDir = new Path(rootdir,
+ HLog.getHLogDirectoryName(hrs.getServerInfo().getServerName()));
+
+ installTable("table", "family", 40);
+ byte[] table = Bytes.toBytes("table");
+ makeHLog(hrs.getLog(), hrs.getOnlineRegions(), "table",
+ NUM_LOG_LINES, 100);
+
+ new Thread() {
+ public void run() {
+ waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
+ for (RegionServerThread rst : rsts) {
+ rst.getRegionServer().abort("testing");
+ }
+ }
+ }.start();
+ // slm.splitLogDistributed(logDir);
+ FileStatus[] logfiles = fs.listStatus(logDir);
+ TaskBatch batch = new TaskBatch();
+ slm.installTask(logfiles[0].getPath().toString(), batch);
+ //waitForCounter but for one of the 2 counters
+ long curt = System.currentTimeMillis();
+ long endt = curt + 30000;
+ while (curt < endt) {
+ if ((tot_wkr_task_resigned.get() + tot_wkr_task_err.get() +
+ tot_wkr_final_transistion_failed.get()) == 0) {
+ Thread.yield();
+ curt = System.currentTimeMillis();
+ } else {
+ assertEquals(1, (tot_wkr_task_resigned.get() + tot_wkr_task_err.get() +
+ tot_wkr_final_transistion_failed.get()));
+ return;
+ }
+ }
+ assertEquals(1, batch.done);
+ // fail("region server completed the split before aborting");
+ return;
+ }
+
+ HTable installTable(String tname, String fname, int nrs ) throws Exception {
+ // Create a table with regions
+ byte [] table = Bytes.toBytes(tname);
+ byte [] family = Bytes.toBytes(fname);
+ LOG.info("Creating table with " + nrs + " regions");
+ HTable ht = TEST_UTIL.createTable(table, new byte[][]{family},
+ 3, Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), nrs);
+ NavigableSet<String> regions = getAllOnlineRegions(cluster);
+ assertEquals(nrs + 2, regions.size());
+ return ht;
+ }
+
+ void populateDataInTable(int nrows, String fname) throws Exception {
+ byte [] family = Bytes.toBytes(fname);
+
+ List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
+ assertEquals(NUM_RS, rsts.size());
+
+ for (RegionServerThread rst : rsts) {
+ HRegionServer hrs = rst.getRegionServer();
+ Collection<HRegion> regions = hrs.getOnlineRegions();
+ for (HRegion r : regions) {
+ HRegionInfo hri = r.getRegionInfo();
+ if (hri.isMetaRegion() || hri.isRootRegion()) {
+ continue;
+ }
+ LOG.debug("adding data to rs = " + rst.getName() +
+ " region = "+ r.getRegionNameAsString());
+ HRegion region = hrs.getOnlineRegion(r.getRegionName());
+ putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), family);
+ }
+ }
+ }
+
+ public void makeHLog(HLog log,
+ Collection<HRegion> rgns, String tname,
+ int num_edits, int edit_size) throws IOException {
+
+ List<HRegion> regions = new ArrayList<HRegion>(rgns);
+ byte[] table = Bytes.toBytes(tname);
+ byte[] value = new byte[edit_size];
+ for (int i = 0; i < edit_size; i++) {
+ value[i] = (byte)('a' + (i % 26));
+ }
+ int n = regions.size();
+ int[] counts = new int[n];
+ int j = 0;
+ for (int i = 0; i < num_edits; i += 1) {
+ WALEdit e = new WALEdit();
+ byte [] row = Bytes.toBytes("r" + Integer.toString(i));
+ byte [] family = Bytes.toBytes("f");
+ byte [] qualifier = Bytes.toBytes("c" + Integer.toString(i));
+ e.add(new KeyValue(row, family, qualifier,
+ System.currentTimeMillis(), value));
+ // LOG.info("Region " + i + ": " + e);
+ j++;
+ log.append(regions.get(j % n).getRegionInfo(), table, e, System.currentTimeMillis());
+ counts[j % n] += 1;
+ // if ((i % 8096) == 0) {
+ // log.sync();
+ // }
+ }
+ log.sync();
+ log.close();
+ for (int i = 0; i < n; i++) {
+ LOG.info("region " + regions.get(i).getRegionNameAsString() +
+ " has " + counts[i] + " edits");
+ }
+ return;
+ }
+
+ private int countHLog(Path log, FileSystem fs, Configuration conf)
+ throws IOException {
+ int count = 0;
+ HLog.Reader in = HLog.getReader(fs, log, conf);
+ while (in.next() != null) {
+ count++;
+ }
+ return count;
+ }
+
+ private void putData(HRegion region, byte[] startRow, int numRows, byte [] qf,
+ byte [] ...families)
+ throws IOException {
+ for(int i = 0; i < numRows; i++) {
+ Put put = new Put(Bytes.add(startRow, Bytes.toBytes(i)));
+ for(byte [] family : families) {
+ put.add(family, qf, null);
+ }
+ region.put(put);
+ }
+ }
+
+ private NavigableSet<String> getAllOnlineRegions(MiniHBaseCluster cluster) {
+ NavigableSet<String> online = new TreeSet<String>();
+ for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
+ for (HRegion region : rst.getRegionServer().getOnlineRegions()) {
+ online.add(region.getRegionNameAsString());
+ }
+ }
+ return online;
+ }
+
+ private void waitForCounter(AtomicLong ctr, long oldval, long newval,
+ long timems) {
+ long curt = System.currentTimeMillis();
+ long endt = curt + timems;
+ while (curt < endt) {
+ if (ctr.get() == oldval) {
+ Thread.yield();
+ curt = System.currentTimeMillis();
+ } else {
+ assertEquals(newval, ctr.get());
+ return;
+ }
+ }
+ assertTrue(false);
+ }
+}