You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2014/12/02 18:20:56 UTC
[17/21] hbase git commit: HBASE-12522 Backport of write-ahead-log
refactoring and follow-ons.
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 592e4a6..8fd0ba7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -43,6 +43,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
@@ -55,7 +56,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.Syncable;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -64,6 +65,15 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
+import static org.apache.hadoop.hbase.wal.DefaultWALProvider.WAL_FILE_NAME_DELIMITER;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
+import org.apache.hadoop.hbase.wal.WALProvider.Writer;
+import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.DrainBarrier;
@@ -88,8 +98,8 @@ import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
/**
- * Implementation of {@link HLog} to go against {@link FileSystem}; i.e. keep WALs in HDFS.
- * Only one HLog/WAL is ever being written at a time. When a WAL hits a configured maximum size,
+ * Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS.
+ * Only one WAL is ever being written at a time. When a WAL hits a configured maximum size,
* it is rolled. This is done internal to the implementation.
*
* <p>As data is flushed from the MemStore to other on-disk structures (files sorted by
@@ -101,11 +111,11 @@ import com.lmax.disruptor.dsl.ProducerType;
* <code>F</code> when all of the edits in <code>F</code> have a log-sequence-id that's older
* (smaller) than the most-recent flush.
*
- * <p>To read an HLog, call {@link HLogFactory#createReader(org.apache.hadoop.fs.FileSystem,
- * org.apache.hadoop.fs.Path, org.apache.hadoop.conf.Configuration)}.
+ * <p>To read an WAL, call {@link WALFactory#createReader(org.apache.hadoop.fs.FileSystem,
+ * org.apache.hadoop.fs.Path)}.
*/
@InterfaceAudience.Private
-class FSHLog implements HLog, Syncable {
+public class FSHLog implements WAL {
// IMPLEMENTATION NOTES:
//
// At the core is a ring buffer. Our ring buffer is the LMAX Disruptor. It tries to
@@ -175,12 +185,6 @@ class FSHLog implements HLog, Syncable {
*/
private final Map<Thread, SyncFuture> syncFuturesByHandler;
- private final FileSystem fs;
- private final Path fullPathLogDir;
- private final Path fullPathOldLogDir;
- private final Configuration conf;
- private final String logFilePrefix;
-
/**
* The highest known outstanding unsync'd WALEdit sequence number where sequence number is the
* ring buffer sequence. Maintained by the ring buffer consumer.
@@ -194,9 +198,63 @@ class FSHLog implements HLog, Syncable {
*/
private final AtomicLong highestSyncedSequence = new AtomicLong(0);
- private WALCoprocessorHost coprocessorHost;
+ /**
+ * file system instance
+ */
+ private final FileSystem fs;
+ /**
+ * WAL directory, where all WAL files would be placed.
+ */
+ private final Path fullPathLogDir;
+ /**
+ * dir path where old logs are kept.
+ */
+ private final Path fullPathArchiveDir;
/**
+ * Matches just those wal files that belong to this wal instance.
+ */
+ private final PathFilter ourFiles;
+
+ /**
+ * Prefix of a WAL file, usually the region server name it is hosted on.
+ */
+ private final String logFilePrefix;
+
+ /**
+ * Suffix included on generated wal file names
+ */
+ private final String logFileSuffix;
+
+ /**
+ * Prefix used when checking for wal membership.
+ */
+ private final String prefixPathStr;
+
+ private final WALCoprocessorHost coprocessorHost;
+
+ /**
+ * conf object
+ */
+ private final Configuration conf;
+ /** Listeners that are called on WAL events. */
+ private final List<WALActionsListener> listeners = new CopyOnWriteArrayList<WALActionsListener>();
+
+ @Override
+ public void registerWALActionsListener(final WALActionsListener listener) {
+ this.listeners.add(listener);
+ }
+
+ @Override
+ public boolean unregisterWALActionsListener(final WALActionsListener listener) {
+ return this.listeners.remove(listener);
+ }
+
+ @Override
+ public WALCoprocessorHost getCoprocessorHost() {
+ return coprocessorHost;
+ }
+ /**
* FSDataOutputStream associated with the current SequenceFile.writer
*/
private FSDataOutputStream hdfs_out;
@@ -243,37 +301,23 @@ class FSHLog implements HLog, Syncable {
*/
private final ReentrantLock rollWriterLock = new ReentrantLock(true);
- // Listeners that are called on WAL events.
- private final List<WALActionsListener> listeners =
- new CopyOnWriteArrayList<WALActionsListener>();
-
private volatile boolean closed = false;
-
- /**
- * Set when this WAL is for meta only (we run a WAL for all regions except meta -- it has its
- * own dedicated WAL).
- */
- private final boolean forMeta;
+ private final AtomicBoolean shutdown = new AtomicBoolean(false);
// The timestamp (in ms) when the log file was created.
private final AtomicLong filenum = new AtomicLong(-1);
- // Number of transactions in the current Hlog.
+ // Number of transactions in the current Wal.
private final AtomicInteger numEntries = new AtomicInteger(0);
// If > than this size, roll the log.
private final long logrollsize;
/**
- * The total size of hlog
+ * The total size of wal
*/
private AtomicLong totalLogSize = new AtomicLong(0);
- /**
- * If WAL is enabled.
- */
- private final boolean enabled;
-
/*
* If more than this many logs, force flush of oldest region to oldest edit
* goes to disk. If too many and we crash, then will take forever replaying.
@@ -285,7 +329,6 @@ class FSHLog implements HLog, Syncable {
private final int closeErrorsTolerated;
private final AtomicInteger closeErrorCount = new AtomicInteger();
- private final MetricsWAL metrics;
// Region sequence id accounting across flushes and for knowing when we can GC a WAL. These
// sequence id numbers are by region and unrelated to the ring buffer sequence number accounting
@@ -334,6 +377,7 @@ class FSHLog implements HLog, Syncable {
/**
* WAL Comparator; it compares the timestamp (log filenum), present in the log file name.
+ * Throws an IllegalArgumentException if used to compare paths from different wals.
*/
public final Comparator<Path> LOG_NAME_COMPARATOR = new Comparator<Path>() {
@Override
@@ -380,15 +424,14 @@ class FSHLog implements HLog, Syncable {
* Constructor.
*
* @param fs filesystem handle
- * @param root path for stored and archived hlogs
- * @param logDir dir where hlogs are stored
+ * @param root path for stored and archived wals
+ * @param logDir dir where wals are stored
* @param conf configuration to use
* @throws IOException
*/
- public FSHLog(final FileSystem fs, final Path root, final String logDir,
- final Configuration conf)
- throws IOException {
- this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, false);
+ public FSHLog(final FileSystem fs, final Path root, final String logDir, final Configuration conf)
+ throws IOException {
+ this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
}
/**
@@ -396,46 +439,95 @@ class FSHLog implements HLog, Syncable {
*
* You should never have to load an existing log. If there is a log at
* startup, it should have already been processed and deleted by the time the
- * HLog object is started up.
+ * WAL object is started up.
*
* @param fs filesystem handle
* @param rootDir path to where logs and oldlogs
- * @param logDir dir where hlogs are stored
- * @param oldLogDir dir where hlogs are archived
+ * @param logDir dir where wals are stored
+ * @param archiveDir dir where wals are archived
* @param conf configuration to use
* @param listeners Listeners on WAL events. Listeners passed here will
* be registered before we do anything else; e.g. the
* Constructor {@link #rollWriter()}.
- * @param failIfLogDirExists If true IOException will be thrown if dir already exists.
+ * @param failIfWALExists If true IOException will be thrown if files related to this wal
+ * already exist.
* @param prefix should always be hostname and port in distributed env and
* it will be URL encoded before being used.
- * If prefix is null, "hlog" will be used
- * @param forMeta if this hlog is meant for meta updates
+ * If prefix is null, "wal" will be used
+ * @param suffix will be url encoded. null is treated as empty. non-empty must start with
+ * {@link DefaultWALProvider#WAL_FILE_NAME_DELIMITER}
* @throws IOException
*/
public FSHLog(final FileSystem fs, final Path rootDir, final String logDir,
- final String oldLogDir, final Configuration conf,
+ final String archiveDir, final Configuration conf,
final List<WALActionsListener> listeners,
- final boolean failIfLogDirExists, final String prefix, boolean forMeta)
- throws IOException {
- super();
+ final boolean failIfWALExists, final String prefix, final String suffix)
+ throws IOException {
this.fs = fs;
this.fullPathLogDir = new Path(rootDir, logDir);
- this.fullPathOldLogDir = new Path(rootDir, oldLogDir);
- this.forMeta = forMeta;
+ this.fullPathArchiveDir = new Path(rootDir, archiveDir);
this.conf = conf;
+ if (!fs.exists(fullPathLogDir) && !fs.mkdirs(fullPathLogDir)) {
+ throw new IOException("Unable to mkdir " + fullPathLogDir);
+ }
+
+ if (!fs.exists(this.fullPathArchiveDir)) {
+ if (!fs.mkdirs(this.fullPathArchiveDir)) {
+ throw new IOException("Unable to mkdir " + this.fullPathArchiveDir);
+ }
+ }
+
+ // If prefix is null||empty then just name it wal
+ this.logFilePrefix =
+ prefix == null || prefix.isEmpty() ? "wal" : URLEncoder.encode(prefix, "UTF8");
+ // we only correctly differentiate suffices when numeric ones start with '.'
+ if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) {
+ throw new IllegalArgumentException("wal suffix must start with '" + WAL_FILE_NAME_DELIMITER +
+ "' but instead was '" + suffix + "'");
+ }
+ this.logFileSuffix = (suffix == null) ? "" : URLEncoder.encode(suffix, "UTF8");
+ this.prefixPathStr = new Path(fullPathLogDir,
+ logFilePrefix + WAL_FILE_NAME_DELIMITER).toString();
+
+ this.ourFiles = new PathFilter() {
+ @Override
+ public boolean accept(final Path fileName) {
+ // The path should start with dir/<prefix> and end with our suffix
+ final String fileNameString = fileName.toString();
+ if (!fileNameString.startsWith(prefixPathStr)) {
+ return false;
+ }
+ if (logFileSuffix.isEmpty()) {
+ // in the case of the null suffix, we need to ensure the filename ends with a timestamp.
+ return org.apache.commons.lang.StringUtils.isNumeric(
+ fileNameString.substring(prefixPathStr.length()));
+ } else if (!fileNameString.endsWith(logFileSuffix)) {
+ return false;
+ }
+ return true;
+ }
+ };
+
+ if (failIfWALExists) {
+ final FileStatus[] walFiles = FSUtils.listStatus(fs, fullPathLogDir, ourFiles);
+ if (null != walFiles && 0 != walFiles.length) {
+ throw new IOException("Target WAL already exists within directory " + fullPathLogDir);
+ }
+ }
+
// Register listeners. TODO: Should this exist anymore? We have CPs?
if (listeners != null) {
for (WALActionsListener i: listeners) {
registerWALActionsListener(i);
}
}
+ this.coprocessorHost = new WALCoprocessorHost(this, conf);
// Get size to roll log at. Roll at 95% of HDFS block size so we avoid crossing HDFS blocks
// (it costs a little x'ing bocks)
- long blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize",
- FSUtils.getDefaultBlockSize(this.fs, this.fullPathLogDir));
+ final long blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize",
+ FSUtils.getDefaultBlockSize(this.fs, this.fullPathLogDir));
this.logrollsize =
(long)(blocksize * conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f));
@@ -444,31 +536,13 @@ class FSHLog implements HLog, Syncable {
FSUtils.getDefaultReplication(fs, this.fullPathLogDir));
this.lowReplicationRollLimit =
conf.getInt("hbase.regionserver.hlog.lowreplication.rolllimit", 5);
- this.enabled = conf.getBoolean("hbase.regionserver.hlog.enabled", true);
this.closeErrorsTolerated = conf.getInt("hbase.regionserver.logroll.errors.tolerated", 0);
- // If prefix is null||empty then just name it hlog
- this.logFilePrefix =
- prefix == null || prefix.isEmpty() ? "hlog" : URLEncoder.encode(prefix, "UTF8");
int maxHandlersCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 200);
LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) +
", rollsize=" + StringUtils.byteDesc(this.logrollsize) +
- ", enabled=" + this.enabled + ", prefix=" + this.logFilePrefix + ", logDir=" +
- this.fullPathLogDir + ", oldLogDir=" + this.fullPathOldLogDir);
-
- boolean dirExists = false;
- if (failIfLogDirExists && (dirExists = this.fs.exists(fullPathLogDir))) {
- throw new IOException("Target HLog directory already exists: " + fullPathLogDir);
- }
- if (!dirExists && !fs.mkdirs(fullPathLogDir)) {
- throw new IOException("Unable to mkdir " + fullPathLogDir);
- }
-
- if (!fs.exists(this.fullPathOldLogDir)) {
- if (!fs.mkdirs(this.fullPathOldLogDir)) {
- throw new IOException("Unable to mkdir " + this.fullPathOldLogDir);
- }
- }
+ ", prefix=" + this.logFilePrefix + ", suffix=" + logFileSuffix + ", logDir=" +
+ this.fullPathLogDir + ", archiveDir=" + this.fullPathArchiveDir);
// rollWriter sets this.hdfs_out if it can.
rollWriter();
@@ -481,9 +555,6 @@ class FSHLog implements HLog, Syncable {
this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out);
this.getPipeLine = getGetPipeline(this.hdfs_out);
- this.coprocessorHost = new WALCoprocessorHost(this, conf);
- this.metrics = new MetricsWAL();
-
// This is the 'writer' -- a single threaded executor. This single thread 'consumes' what is
// put on the ring buffer.
String hostingThreadName = Thread.currentThread().getName();
@@ -515,56 +586,22 @@ class FSHLog implements HLog, Syncable {
}
/**
- * Find the 'getNumCurrentReplicas' on the passed <code>os</code> stream.
- * @return Method or null.
+ * Get the backing files associated with this WAL.
+ * @return may be null if there are no files.
*/
- private static Method getGetNumCurrentReplicas(final FSDataOutputStream os) {
- // TODO: Remove all this and use the now publically available
- // HdfsDataOutputStream#getCurrentBlockReplication()
- Method m = null;
- if (os != null) {
- Class<? extends OutputStream> wrappedStreamClass = os.getWrappedStream().getClass();
- try {
- m = wrappedStreamClass.getDeclaredMethod("getNumCurrentReplicas", new Class<?>[] {});
- m.setAccessible(true);
- } catch (NoSuchMethodException e) {
- LOG.info("FileSystem's output stream doesn't support getNumCurrentReplicas; " +
- "HDFS-826 not available; fsOut=" + wrappedStreamClass.getName());
- } catch (SecurityException e) {
- LOG.info("No access to getNumCurrentReplicas on FileSystems's output stream; HDFS-826 " +
- "not available; fsOut=" + wrappedStreamClass.getName(), e);
- m = null; // could happen on setAccessible()
- }
- }
- if (m != null) {
- if (LOG.isTraceEnabled()) LOG.trace("Using getNumCurrentReplicas");
- }
- return m;
- }
-
- @Override
- public void registerWALActionsListener(final WALActionsListener listener) {
- this.listeners.add(listener);
- }
-
- @Override
- public boolean unregisterWALActionsListener(final WALActionsListener listener) {
- return this.listeners.remove(listener);
- }
-
- @Override
- public long getFilenum() {
- return this.filenum.get();
+ protected FileStatus[] getFiles() throws IOException {
+ return FSUtils.listStatus(fs, fullPathLogDir, ourFiles);
}
/**
- * Method used internal to this class and for tests only.
- * @return The wrapped stream our writer is using; its not the
- * writer's 'out' FSDatoOutputStream but the stream that this 'out' wraps
- * (In hdfs its an instance of DFSDataOutputStream).
- *
- * usage: see TestLogRolling.java
+ * Currently, we need to expose the writer's OutputStream to tests so that they can manipulate
+ * the default behavior (such as setting the maxRecoveryErrorCount value for example (see
+ * {@link TestWALReplay#testReplayEditsWrittenIntoWAL()}). This is done using reflection on the
+ * underlying HDFS OutputStream.
+ * NOTE: This could be removed once Hadoop1 support is removed.
+ * @return null if underlying stream is not ready.
*/
+ @VisibleForTesting
OutputStream getOutputStream() {
return this.hdfs_out.getWrappedStream();
}
@@ -574,12 +611,16 @@ class FSHLog implements HLog, Syncable {
return rollWriter(false);
}
+ /**
+ * retrieve the next path to use for writing.
+ * Increments the internal filenum.
+ */
private Path getNewPath() throws IOException {
this.filenum.set(System.currentTimeMillis());
- Path newPath = computeFilename();
+ Path newPath = getCurrentFileName();
while (fs.exists(newPath)) {
this.filenum.incrementAndGet();
- newPath = computeFilename();
+ newPath = getCurrentFileName();
}
return newPath;
}
@@ -588,7 +629,7 @@ class FSHLog implements HLog, Syncable {
long currentFilenum = this.filenum.get();
Path oldPath = null;
if (currentFilenum > 0) {
- // ComputeFilename will take care of meta hlog filename
+ // ComputeFilename will take care of meta wal filename
oldPath = computeFilename(currentFilenum);
} // I presume if currentFilenum is <= 0, this is first file and null for oldPath if fine?
return oldPath;
@@ -644,11 +685,11 @@ class FSHLog implements HLog, Syncable {
if (!force && (this.writer != null && this.numEntries.get() <= 0)) return null;
byte [][] regionsToFlush = null;
if (this.closed) {
- LOG.debug("HLog closed. Skipping rolling of writer");
+ LOG.debug("WAL closed. Skipping rolling of writer");
return regionsToFlush;
}
if (!closeBarrier.beginOp()) {
- LOG.debug("HLog closing. Skipping rolling of writer");
+ LOG.debug("WAL closing. Skipping rolling of writer");
return regionsToFlush;
}
TraceScope scope = Trace.startSpan("FSHLog.rollWriter");
@@ -656,7 +697,7 @@ class FSHLog implements HLog, Syncable {
Path oldPath = getOldPath();
Path newPath = getNewPath();
// Any exception from here on is catastrophic, non-recoverable so we currently abort.
- FSHLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf);
+ Writer nextWriter = this.createWriterInstance(newPath);
FSDataOutputStream nextHdfsOut = null;
if (nextWriter instanceof ProtobufLogWriter) {
nextHdfsOut = ((ProtobufLogWriter)nextWriter).getStream();
@@ -688,18 +729,10 @@ class FSHLog implements HLog, Syncable {
* This method allows subclasses to inject different writers without having to
* extend other methods like rollWriter().
*
- * @param fs
- * @param path
- * @param conf
* @return Writer instance
- * @throws IOException
*/
- protected Writer createWriterInstance(final FileSystem fs, final Path path,
- final Configuration conf) throws IOException {
- if (forMeta) {
- //TODO: set a higher replication for the hlog files (HBASE-6773)
- }
- return HLogFactory.createWALWriter(fs, path, conf);
+ protected Writer createWriterInstance(final Path path) throws IOException {
+ return DefaultWALProvider.createWriter(conf, fs, path, false);
}
/**
@@ -747,7 +780,7 @@ class FSHLog implements HLog, Syncable {
* {@link #oldestUnflushedRegionSequenceIds} and {@link #lowestFlushingRegionSequenceIds}. If,
* for all regions, the value is lesser than the minimum of values present in the
* oldestFlushing/UnflushedSeqNums, then the wal file is eligible for archiving.
- * @param sequenceNums for a HLog, at the time when it was rolled.
+ * @param sequenceNums for a WAL, at the time when it was rolled.
* @param oldestFlushingMap
* @param oldestUnflushedMap
* @return true if wal is eligible for archiving, false otherwise.
@@ -816,7 +849,7 @@ class FSHLog implements HLog, Syncable {
if (i > 0) sb.append(", ");
sb.append(Bytes.toStringBinary(regions[i]));
}
- LOG.info("Too many hlogs: logs=" + logCount + ", maxlogs=" +
+ LOG.info("Too many wals: logs=" + logCount + ", maxlogs=" +
this.maxLogs + "; forcing flush of " + regions.length + " regions(s): " +
sb.toString());
}
@@ -825,16 +858,23 @@ class FSHLog implements HLog, Syncable {
/**
* Cleans up current writer closing it and then puts in place the passed in
- * <code>nextWriter</code>
- *
- * @param oldPath
- * @param newPath
- * @param nextWriter
- * @param nextHdfsOut
- * @return <code>newPath</code>
- * @throws IOException
+ * <code>nextWriter</code>.
+ *
+ * In the case of creating a new WAL, oldPath will be null.
+ *
+ * In the case of rolling over from one file to the next, none of the params will be null.
+ *
+ * In the case of closing out this FSHLog with no further use newPath, nextWriter, and
+ * nextHdfsOut will be null.
+ *
+ * @param oldPath may be null
+ * @param newPath may be null
+ * @param nextWriter may be null
+ * @param nextHdfsOut may be null
+ * @return the passed in <code>newPath</code>
+ * @throws IOException if there is a problem flushing or closing the underlying FS
*/
- Path replaceWriter(final Path oldPath, final Path newPath, FSHLog.Writer nextWriter,
+ Path replaceWriter(final Path oldPath, final Path newPath, Writer nextWriter,
final FSDataOutputStream nextHdfsOut)
throws IOException {
// Ask the ring buffer writer to pause at a safe point. Once we do this, the writer
@@ -887,6 +927,7 @@ class FSHLog implements HLog, Syncable {
this.hdfs_out = nextHdfsOut;
int oldNumEntries = this.numEntries.get();
this.numEntries.set(0);
+ final String newPathString = (null == newPath ? null : FSUtils.getPath(newPath));
if (oldPath != null) {
this.byWalRegionSequenceIds.put(oldPath, this.highestRegionSequenceIds);
this.highestRegionSequenceIds = new HashMap<byte[], Long>();
@@ -894,16 +935,16 @@ class FSHLog implements HLog, Syncable {
this.totalLogSize.addAndGet(oldFileLen);
LOG.info("Rolled WAL " + FSUtils.getPath(oldPath) + " with entries=" + oldNumEntries +
", filesize=" + StringUtils.byteDesc(oldFileLen) + "; new WAL " +
- FSUtils.getPath(newPath));
+ newPathString);
} else {
- LOG.info("New WAL " + FSUtils.getPath(newPath));
+ LOG.info("New WAL " + newPathString);
}
} catch (InterruptedException ie) {
// Perpetuate the interrupt
Thread.currentThread().interrupt();
} catch (IOException e) {
long count = getUnflushedEntriesCount();
- LOG.error("Failed close of HLog writer " + oldPath + ", unflushedEntries=" + count, e);
+ LOG.error("Failed close of WAL writer " + oldPath + ", unflushedEntries=" + count, e);
throw new FailedLogCloseException(oldPath + ", unflushedEntries=" + count, e);
} finally {
try {
@@ -930,8 +971,16 @@ class FSHLog implements HLog, Syncable {
return getUnflushedEntriesCount() > 0;
}
+ /*
+ * only public so WALSplitter can use.
+ * @return archived location of a WAL file with the given path p
+ */
+ public static Path getWALArchivePath(Path archiveDir, Path p) {
+ return new Path(archiveDir, p.getName());
+ }
+
private void archiveLogFile(final Path p) throws IOException {
- Path newPath = getHLogArchivePath(this.fullPathOldLogDir, p);
+ Path newPath = getWALArchivePath(this.fullPathArchiveDir, p);
// Tell our listeners that a log is going to be archived.
if (!this.listeners.isEmpty()) {
for (WALActionsListener i : this.listeners) {
@@ -956,24 +1005,25 @@ class FSHLog implements HLog, Syncable {
* @return Path
*/
protected Path computeFilename(final long filenum) {
- this.filenum.set(filenum);
- return computeFilename();
+ if (filenum < 0) {
+ throw new RuntimeException("wal file number can't be < 0");
+ }
+ String child = logFilePrefix + WAL_FILE_NAME_DELIMITER + filenum + logFileSuffix;
+ return new Path(fullPathLogDir, child);
}
/**
* This is a convenience method that computes a new filename with a given
- * using the current HLog file-number
+ * using the current WAL file-number
* @return Path
*/
- protected Path computeFilename() {
- if (this.filenum.get() < 0) {
- throw new RuntimeException("hlog file number can't be < 0");
- }
- String child = logFilePrefix + "." + filenum;
- if (forMeta) {
- child += HLog.META_HLOG_FILE_EXTN;
- }
- return new Path(fullPathLogDir, child);
+ public Path getCurrentFileName() {
+ return computeFilename(this.filenum.get());
+ }
+
+ @Override
+ public String toString() {
+ return "FSHLog " + logFilePrefix + ":" + logFileSuffix + "(num " + filenum + ")";
}
/**
@@ -986,26 +1036,23 @@ class FSHLog implements HLog, Syncable {
*/
protected long getFileNumFromFileName(Path fileName) {
if (fileName == null) throw new IllegalArgumentException("file name can't be null");
- // The path should start with dir/<prefix>.
- String prefixPathStr = new Path(fullPathLogDir, logFilePrefix + ".").toString();
- if (!fileName.toString().startsWith(prefixPathStr)) {
- throw new IllegalArgumentException("The log file " + fileName + " doesn't belong to" +
- " this regionserver " + prefixPathStr);
- }
- String chompedPath = fileName.toString().substring(prefixPathStr.length());
- if (forMeta) chompedPath = chompedPath.substring(0, chompedPath.indexOf(META_HLOG_FILE_EXTN));
+ if (!ourFiles.accept(fileName)) {
+ throw new IllegalArgumentException("The log file " + fileName +
+ " doesn't belong to this wal. (" + toString() + ")");
+ }
+ final String fileNameString = fileName.toString();
+ String chompedPath = fileNameString.substring(prefixPathStr.length(),
+ (fileNameString.length() - logFileSuffix.length()));
return Long.parseLong(chompedPath);
}
@Override
- public void closeAndDelete() throws IOException {
- close();
- if (!fs.exists(this.fullPathLogDir)) return;
- FileStatus[] files = fs.listStatus(this.fullPathLogDir);
- if (files != null) {
- for(FileStatus file : files) {
-
- Path p = getHLogArchivePath(this.fullPathOldLogDir, file.getPath());
+ public void close() throws IOException {
+ shutdown();
+ final FileStatus[] files = getFiles();
+ if (null != files && 0 != files.length) {
+ for (FileStatus file : files) {
+ Path p = getWALArchivePath(this.fullPathArchiveDir, file.getPath());
// Tell our listeners that a log is going to be archived.
if (!this.listeners.isEmpty()) {
for (WALActionsListener i : this.listeners) {
@@ -1024,54 +1071,53 @@ class FSHLog implements HLog, Syncable {
}
}
LOG.debug("Moved " + files.length + " WAL file(s) to " +
- FSUtils.getPath(this.fullPathOldLogDir));
- }
- if (!fs.delete(fullPathLogDir, true)) {
- LOG.info("Unable to delete " + fullPathLogDir);
+ FSUtils.getPath(this.fullPathArchiveDir));
}
+ LOG.info("Closed WAL: " + toString() );
}
@Override
- public void close() throws IOException {
- if (this.closed) return;
- try {
- // Prevent all further flushing and rolling.
- closeBarrier.stopAndDrainOps();
- } catch (InterruptedException e) {
- LOG.error("Exception while waiting for cache flushes and log rolls", e);
- Thread.currentThread().interrupt();
- }
-
- // Shutdown the disruptor. Will stop after all entries have been processed. Make sure we have
- // stopped incoming appends before calling this else it will not shutdown. We are
- // conservative below waiting a long time and if not elapsed, then halting.
- if (this.disruptor != null) {
- long timeoutms = conf.getLong("hbase.wal.disruptor.shutdown.timeout.ms", 60000);
+ public void shutdown() throws IOException {
+ if (shutdown.compareAndSet(false, true)) {
try {
- this.disruptor.shutdown(timeoutms, TimeUnit.MILLISECONDS);
- } catch (TimeoutException e) {
- LOG.warn("Timed out bringing down disruptor after " + timeoutms + "ms; forcing halt " +
- "(It is a problem if this is NOT an ABORT! -- DATALOSS!!!!)");
- this.disruptor.halt();
- this.disruptor.shutdown();
+ // Prevent all further flushing and rolling.
+ closeBarrier.stopAndDrainOps();
+ } catch (InterruptedException e) {
+ LOG.error("Exception while waiting for cache flushes and log rolls", e);
+ Thread.currentThread().interrupt();
}
- }
- // With disruptor down, this is safe to let go.
- if (this.appendExecutor != null) this.appendExecutor.shutdown();
- // Tell our listeners that the log is closing
- if (!this.listeners.isEmpty()) {
- for (WALActionsListener i : this.listeners) {
- i.logCloseRequested();
+ // Shutdown the disruptor. Will stop after all entries have been processed. Make sure we
+ // have stopped incoming appends before calling this else it will not shutdown. We are
+ // conservative below waiting a long time and if not elapsed, then halting.
+ if (this.disruptor != null) {
+ long timeoutms = conf.getLong("hbase.wal.disruptor.shutdown.timeout.ms", 60000);
+ try {
+ this.disruptor.shutdown(timeoutms, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ LOG.warn("Timed out bringing down disruptor after " + timeoutms + "ms; forcing halt " +
+ "(It is a problem if this is NOT an ABORT! -- DATALOSS!!!!)");
+ this.disruptor.halt();
+ this.disruptor.shutdown();
+ }
+ }
+ // With disruptor down, this is safe to let go.
+ if (this.appendExecutor != null) this.appendExecutor.shutdown();
+
+ // Tell our listeners that the log is closing
+ if (!this.listeners.isEmpty()) {
+ for (WALActionsListener i : this.listeners) {
+ i.logCloseRequested();
+ }
+ }
+ this.closed = true;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Closing WAL writer in " + FSUtils.getPath(fullPathLogDir));
+ }
+ if (this.writer != null) {
+ this.writer.close();
+ this.writer = null;
}
- }
- this.closed = true;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Closing WAL writer in " + this.fullPathLogDir.toString());
- }
- if (this.writer != null) {
- this.writer.close();
- this.writer = null;
}
}
@@ -1083,60 +1129,18 @@ class FSHLog implements HLog, Syncable {
* @param clusterIds that have consumed the change
* @return New log key.
*/
- protected HLogKey makeKey(byte[] encodedRegionName, TableName tableName, long seqnum,
+ protected WALKey makeKey(byte[] encodedRegionName, TableName tableName, long seqnum,
long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
+ // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
return new HLogKey(encodedRegionName, tableName, seqnum, now, clusterIds, nonceGroup, nonce);
}
- @Override
- @VisibleForTesting
- public void append(HRegionInfo info, TableName tableName, WALEdit edits,
- final long now, HTableDescriptor htd, AtomicLong sequenceId)
- throws IOException {
- HLogKey logKey = new HLogKey(info.getEncodedNameAsBytes(), tableName, now);
- append(htd, info, logKey, edits, sequenceId, true, true, null);
- }
-
- @Override
- public long appendNoSync(final HRegionInfo info, TableName tableName, WALEdit edits,
- List<UUID> clusterIds, final long now, HTableDescriptor htd, AtomicLong sequenceId,
- boolean inMemstore, long nonceGroup, long nonce) throws IOException {
- HLogKey logKey =
- new HLogKey(info.getEncodedNameAsBytes(), tableName, now, clusterIds, nonceGroup, nonce);
- return append(htd, info, logKey, edits, sequenceId, false, inMemstore, null);
- }
-
- @Override
- public long appendNoSync(final HTableDescriptor htd, final HRegionInfo info, final HLogKey key,
- final WALEdit edits, final AtomicLong sequenceId, final boolean inMemstore,
- final List<Cell> memstoreCells)
- throws IOException {
- return append(htd, info, key, edits, sequenceId, false, inMemstore, memstoreCells);
- }
-
- /**
- * Append a set of edits to the log. Log edits are keyed by (encoded) regionName, rowname, and
- * log-sequence-id.
- * @param key
- * @param edits
- * @param htd This comes in here just so it is available on a pre append for replications. Get
- * rid of it. It is kinda crazy this comes in here when we have tablename and regioninfo.
- * Replication gets its scope from the HTD.
- * @param hri region info
- * @param sync shall we sync after we call the append?
- * @param inMemstore
- * @param sequenceId The region sequence id reference.
- * @param memstoreCells
- * @return txid of this transaction or if nothing to do, the last txid
- * @throws IOException
- */
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH_EXCEPTION",
justification="Will never be null")
- private long append(HTableDescriptor htd, final HRegionInfo hri, final HLogKey key,
- WALEdit edits, AtomicLong sequenceId, boolean sync, boolean inMemstore,
- List<Cell> memstoreCells)
- throws IOException {
- if (!this.enabled) return this.highestUnsyncedSequence;
+ @Override
+ public long append(final HTableDescriptor htd, final HRegionInfo hri, final WALKey key,
+ final WALEdit edits, final AtomicLong sequenceId, final boolean inMemstore,
+ final List<Cell> memstoreCells) throws IOException {
if (this.closed) throw new IOException("Cannot append; log is closed");
// Make a trace scope for the append. It is closed on other side of the ring buffer by the
// single consuming thread. Don't have to worry about it.
@@ -1157,9 +1161,6 @@ class FSHLog implements HLog, Syncable {
} finally {
this.disruptor.getRingBuffer().publish(sequence);
}
- // doSync is set in tests. Usually we arrive in here via appendNoSync w/ the sync called after
- // all edits on a handler have been added.
- if (sync) sync(sequence);
return sequence;
}
@@ -1303,7 +1304,7 @@ class FSHLog implements HLog, Syncable {
Trace.addTimelineAnnotation("writer synced");
currentSequence = updateHighestSyncedSequence(currentSequence);
} catch (IOException e) {
- LOG.error("Error syncing, request close of hlog ", e);
+ LOG.error("Error syncing, request close of wal ", e);
t = e;
} catch (Exception e) {
LOG.warn("UNEXPECTED", e);
@@ -1364,7 +1365,7 @@ class FSHLog implements HLog, Syncable {
LOG.warn("HDFS pipeline error detected. " + "Found "
+ numCurrentReplicas + " replicas but expecting no less than "
+ this.minTolerableReplication + " replicas. "
- + " Requesting close of hlog.");
+ + " Requesting close of wal.");
logRollNeeded = true;
// If rollWriter is requested, increase consecutiveLogRolls. Once it
// is larger than lowReplicationRollLimit, disable the
@@ -1448,10 +1449,7 @@ class FSHLog implements HLog, Syncable {
return syncFuture.reset(sequence, span);
}
- @Override
- public void postSync(final long timeInNanos, final int handlerSyncs) {
- // TODO: Add metric for handler syncs done at a time.
- if (this.metrics != null) metrics.finishSync(timeInNanos/1000000);
+ private void postSync(final long timeInNanos, final int handlerSyncs) {
if (timeInNanos > this.slowSyncNs) {
String msg =
new StringBuilder().append("Slow sync cost: ")
@@ -1460,19 +1458,57 @@ class FSHLog implements HLog, Syncable {
Trace.addTimelineAnnotation(msg);
LOG.info(msg);
}
+ if (!listeners.isEmpty()) {
+ for (WALActionsListener listener : listeners) {
+ listener.postSync(timeInNanos, handlerSyncs);
+ }
+ }
}
- @Override
- public long postAppend(final Entry e, final long elapsedTime) {
+ private long postAppend(final Entry e, final long elapsedTime) {
long len = 0;
- if (this.metrics == null) return len;
- for (Cell cell : e.getEdit().getCells()) len += CellUtil.estimatedSerializedSizeOf(cell);
- metrics.finishAppend(elapsedTime, len);
+ if (!listeners.isEmpty()) {
+ for (Cell cell : e.getEdit().getCells()) {
+ len += CellUtil.estimatedSerializedSizeOf(cell);
+ }
+ for (WALActionsListener listener : listeners) {
+ listener.postAppend(len, elapsedTime);
+ }
+ }
return len;
}
/**
- * This method gets the datanode replication count for the current HLog.
+ * Find the 'getNumCurrentReplicas' on the passed <code>os</code> stream.
+ * This is used for getting current replicas of a file being written.
+ * @return Method or null.
+ */
+ private Method getGetNumCurrentReplicas(final FSDataOutputStream os) {
+ // TODO: Remove all this and use the now publically available
+ // HdfsDataOutputStream#getCurrentBlockReplication()
+ Method m = null;
+ if (os != null) {
+ Class<? extends OutputStream> wrappedStreamClass = os.getWrappedStream().getClass();
+ try {
+ m = wrappedStreamClass.getDeclaredMethod("getNumCurrentReplicas", new Class<?>[] {});
+ m.setAccessible(true);
+ } catch (NoSuchMethodException e) {
+ LOG.info("FileSystem's output stream doesn't support getNumCurrentReplicas; " +
+ "HDFS-826 not available; fsOut=" + wrappedStreamClass.getName());
+ } catch (SecurityException e) {
+ LOG.info("No access to getNumCurrentReplicas on FileSystems's output stream; HDFS-826 " +
+ "not available; fsOut=" + wrappedStreamClass.getName(), e);
+ m = null; // could happen on setAccessible()
+ }
+ }
+ if (m != null) {
+ if (LOG.isTraceEnabled()) LOG.trace("Using getNumCurrentReplicas");
+ }
+ return m;
+ }
+
+ /**
+ * This method gets the datanode replication count for the current WAL.
*
* If the pipeline isn't started yet or is empty, you will get the default
* replication factor. Therefore, if this function returns 0, it means you
@@ -1483,10 +1519,12 @@ class FSHLog implements HLog, Syncable {
*
* @throws Exception
*/
+ @VisibleForTesting
int getLogReplication()
throws IllegalArgumentException, IllegalAccessException, InvocationTargetException {
- if (this.getNumCurrentReplicas != null && this.hdfs_out != null) {
- Object repl = this.getNumCurrentReplicas.invoke(getOutputStream(), NO_ARGS);
+ final OutputStream stream = getOutputStream();
+ if (this.getNumCurrentReplicas != null && stream != null) {
+ Object repl = this.getNumCurrentReplicas.invoke(stream, NO_ARGS);
if (repl instanceof Integer) {
return ((Integer)repl).intValue();
}
@@ -1494,32 +1532,6 @@ class FSHLog implements HLog, Syncable {
return 0;
}
- boolean canGetCurReplicas() {
- return this.getNumCurrentReplicas != null;
- }
-
- @Override
- public void hsync() throws IOException {
- TraceScope scope = Trace.startSpan("FSHLog.hsync");
- try {
- scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach()));
- } finally {
- assert scope == NullScope.INSTANCE || !scope.isDetached();
- scope.close();
- }
- }
-
- @Override
- public void hflush() throws IOException {
- TraceScope scope = Trace.startSpan("FSHLog.hflush");
- try {
- scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach()));
- } finally {
- assert scope == NullScope.INSTANCE || !scope.isDetached();
- scope.close();
- }
- }
-
@Override
public void sync() throws IOException {
TraceScope scope = Trace.startSpan("FSHLog.sync");
@@ -1546,7 +1558,8 @@ class FSHLog implements HLog, Syncable {
}
}
- void requestLogRoll() {
+ // public only until class moves to o.a.h.h.wal
+ public void requestLogRoll() {
if (!this.listeners.isEmpty()) {
for (WALActionsListener i: this.listeners) {
i.logRollRequested();
@@ -1554,25 +1567,21 @@ class FSHLog implements HLog, Syncable {
}
}
- /** @return How many items have been added to the log */
- int getNumEntries() {
- return numEntries.get();
- }
-
+ // public only until class moves to o.a.h.h.wal
/** @return the number of rolled log files */
public int getNumRolledLogFiles() {
return byWalRegionSequenceIds.size();
}
+ // public only until class moves to o.a.h.h.wal
/** @return the number of log files in use */
- @Override
public int getNumLogFiles() {
// +1 for current use log
return getNumRolledLogFiles() + 1;
}
+ // public only until class moves to o.a.h.h.wal
/** @return the size of log files in use */
- @Override
public long getLogFileSize() {
return this.totalLogSize.get();
}
@@ -1636,28 +1645,11 @@ class FSHLog implements HLog, Syncable {
}
}
- @Override
- public boolean isLowReplicationRollEnabled() {
+ @VisibleForTesting
+ boolean isLowReplicationRollEnabled() {
return lowReplicationRollEnabled;
}
- /**
- * Get the directory we are making logs in.
- *
- * @return dir
- */
- protected Path getDir() {
- return fullPathLogDir;
- }
-
- static Path getHLogArchivePath(Path oldLogDir, Path p) {
- return new Path(oldLogDir, p.getName());
- }
-
- static String formatRecoveredEditsFileName(final long seqid) {
- return String.format("%019d", seqid);
- }
-
public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT + (5 * ClassSize.REFERENCE) +
ClassSize.ATOMIC_INTEGER + Bytes.SIZEOF_INT + (3 * Bytes.SIZEOF_LONG));
@@ -1673,14 +1665,10 @@ class FSHLog implements HLog, Syncable {
}
final Path baseDir = FSUtils.getRootDir(conf);
- final Path oldLogDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
- HLogSplitter.split(baseDir, p, oldLogDir, fs, conf);
+ final Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
+ WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf));
}
- @Override
- public WALCoprocessorHost getCoprocessorHost() {
- return coprocessorHost;
- }
@Override
public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) {
@@ -1932,7 +1920,7 @@ class FSHLog implements HLog, Syncable {
long start = EnvironmentEdgeManager.currentTime();
byte [] encodedRegionName = entry.getKey().getEncodedRegionName();
- long regionSequenceId = HLog.NO_SEQUENCE_ID;
+ long regionSequenceId = WALKey.NO_SEQUENCE_ID;
try {
// We are about to append this edit; update the region-scoped sequence number. Do it
// here inside this single appending/writing thread. Events are ordered on the ringbuffer
@@ -1975,7 +1963,7 @@ class FSHLog implements HLog, Syncable {
// Update metrics.
postAppend(entry, EnvironmentEdgeManager.currentTime() - start);
} catch (Exception e) {
- LOG.fatal("Could not append. Requesting close of hlog", e);
+ LOG.fatal("Could not append. Requesting close of wal", e);
requestLogRoll();
throw e;
}
@@ -2006,7 +1994,7 @@ class FSHLog implements HLog, Syncable {
}
private static void usage() {
- System.err.println("Usage: HLog <ARGS>");
+ System.err.println("Usage: FSHLog <ARGS>");
System.err.println("Arguments:");
System.err.println(" --dump Dump textual representation of passed one or more files");
System.err.println(" For example: " +
@@ -2014,7 +2002,6 @@ class FSHLog implements HLog, Syncable {
System.err.println(" --split Split the passed directory of WAL logs");
System.err.println(" For example: " +
"FSHLog --split hdfs://example.com:9000/hbase/.logs/DIR");
- System.err.println(" --perf Write the same key <N> times to a WAL: e.g. FSHLog --perf 10");
}
/**
@@ -2029,28 +2016,14 @@ class FSHLog implements HLog, Syncable {
usage();
System.exit(-1);
}
- // either dump using the HLogPrettyPrinter or split, depending on args
+ // either dump using the WALPrettyPrinter or split, depending on args
if (args[0].compareTo("--dump") == 0) {
- HLogPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length));
+ WALPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length));
} else if (args[0].compareTo("--perf") == 0) {
- final int count = Integer.parseInt(args[1]);
- // Put up a WAL and just keep adding same edit to it. Simple perf test.
- Configuration conf = HBaseConfiguration.create();
- Path rootDir = FSUtils.getRootDir(conf);
- FileSystem fs = rootDir.getFileSystem(conf);
- FSHLog wal =
- new FSHLog(fs, rootDir, "perflog", "oldPerflog", conf, null, false, "perf", false);
- long start = System.nanoTime();
- WALEdit walEdit = new WALEdit();
- walEdit.add(new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"),
- Bytes.toBytes("qualifier"), -1, new byte [1000]));
- for (AtomicLong i = new AtomicLong(0); i.get() < count; i.incrementAndGet()) {
- wal.append(HRegionInfo.FIRST_META_REGIONINFO, TableName.META_TABLE_NAME, walEdit, start,
- HTableDescriptor.META_TABLEDESC, i);
- wal.sync();
- }
- wal.close();
- LOG.info("Write " + count + " 1k edits in " + (System.nanoTime() - start) + "nanos");
+ LOG.fatal("Please use the WALPerformanceEvaluation tool instead. i.e.:");
+ LOG.fatal("\thbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation --iterations " +
+ args[1]);
+ System.exit(-1);
} else if (args[0].compareTo("--split") == 0) {
Configuration conf = HBaseConfiguration.create();
for (int i = 1; i < args.length; i++) {
@@ -2098,9 +2071,9 @@ class FSHLog implements HLog, Syncable {
}
/**
- * This method gets the pipeline for the current HLog.
- * @return
+ * This method gets the pipeline for the current WAL.
*/
+ @VisibleForTesting
DatanodeInfo[] getPipeLine() {
if (this.getPipeLine != null && this.hdfs_out != null) {
Object repl;
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
index 0325f78..d9942b3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
@@ -27,16 +27,19 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALKey;
+
/**
* A WAL Entry for {@link FSHLog} implementation. Immutable.
- * A subclass of {@link HLog.Entry} that carries extra info across the ring buffer such as
+ * A subclass of {@link Entry} that carries extra info across the ring buffer such as
* region sequence id (we want to use this later, just before we write the WAL to ensure region
* edits maintain order). The extra info added here is not 'serialized' as part of the WALEdit
* hence marked 'transient' to underline this fact. It also adds mechanism so we can wait on
* the assign of the region sequence id. See {@link #stampRegionSequenceId()}.
*/
@InterfaceAudience.Private
-class FSWALEntry extends HLog.Entry {
+class FSWALEntry extends Entry {
// The below data members are denoted 'transient' just to highlight these are not persisted;
// they are only in memory and held here while passing over the ring buffer.
private final transient long sequence;
@@ -46,7 +49,7 @@ class FSWALEntry extends HLog.Entry {
private final transient HRegionInfo hri;
private final transient List<Cell> memstoreCells;
- FSWALEntry(final long sequence, final HLogKey key, final WALEdit edit,
+ FSWALEntry(final long sequence, final WALKey key, final WALEdit edit,
final AtomicLong referenceToRegionSequenceId, final boolean inMemstore,
final HTableDescriptor htd, final HRegionInfo hri, List<Cell> memstoreCells) {
super(key, edit);
@@ -98,7 +101,7 @@ class FSWALEntry extends HLog.Entry {
CellUtil.setSequenceId(cell, regionSequenceId);
}
}
- HLogKey key = getKey();
+ WALKey key = getKey();
key.setLogSeqNum(regionSequenceId);
return regionSequenceId;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
deleted file mode 100644
index eb3692e..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
+++ /dev/null
@@ -1,445 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.regionserver.wal;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.regex.Pattern;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
-import org.apache.hadoop.io.Writable;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * HLog records all the edits to HStore. It is the hbase write-ahead-log (WAL).
- */
-@InterfaceAudience.Private
-// TODO: Rename interface to WAL
-public interface HLog {
- Log LOG = LogFactory.getLog(HLog.class);
- public static final long NO_SEQUENCE_ID = -1;
-
- /** File Extension used while splitting an HLog into regions (HBASE-2312) */
- // TODO: this seems like an implementation detail that does not belong here.
- String SPLITTING_EXT = "-splitting";
- boolean SPLIT_SKIP_ERRORS_DEFAULT = false;
- /** The hbase:meta region's HLog filename extension.*/
- // TODO: Implementation detail. Does not belong in here.
- String META_HLOG_FILE_EXTN = ".meta";
-
- /**
- * Configuration name of HLog Trailer's warning size. If a waltrailer's size is greater than the
- * configured size, a warning is logged. This is used with Protobuf reader/writer.
- */
- // TODO: Implementation detail. Why in here?
- String WAL_TRAILER_WARN_SIZE = "hbase.regionserver.waltrailer.warn.size";
- int DEFAULT_WAL_TRAILER_WARN_SIZE = 1024 * 1024; // 1MB
-
- // TODO: Implementation detail. Why in here?
- Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+");
- String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp";
- String SEQUENCE_ID_FILE_SUFFIX = "_seqid";
-
- /**
- * WAL Reader Interface
- */
- interface Reader {
- /**
- * @param fs File system.
- * @param path Path.
- * @param c Configuration.
- * @param s Input stream that may have been pre-opened by the caller; may be null.
- */
- void init(FileSystem fs, Path path, Configuration c, FSDataInputStream s) throws IOException;
-
- void close() throws IOException;
-
- Entry next() throws IOException;
-
- Entry next(Entry reuse) throws IOException;
-
- void seek(long pos) throws IOException;
-
- long getPosition() throws IOException;
- void reset() throws IOException;
-
- /**
- * @return the WALTrailer of the current HLog. It may be null in case of legacy or corrupt WAL
- * files.
- */
- // TODO: What we need a trailer on WAL for? It won't be present on last WAL most of the time.
- // What then?
- WALTrailer getWALTrailer();
- }
-
- /**
- * WAL Writer Intrface.
- */
- interface Writer {
- void init(FileSystem fs, Path path, Configuration c, boolean overwritable) throws IOException;
-
- void close() throws IOException;
-
- void sync() throws IOException;
-
- void append(Entry entry) throws IOException;
-
- long getLength() throws IOException;
-
- /**
- * Sets HLog/WAL's WALTrailer. This trailer is appended at the end of WAL on closing.
- * @param walTrailer trailer to append to WAL.
- */
- // TODO: Why a trailer on the log?
- void setWALTrailer(WALTrailer walTrailer);
- }
-
- /**
- * Utility class that lets us keep track of the edit and it's associated key. Only used when
- * splitting logs.
- */
- // TODO: Remove this Writable.
- // TODO: Why is this in here? Implementation detail?
- @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
- class Entry implements Writable {
- private WALEdit edit;
- private HLogKey key;
-
- public Entry() {
- edit = new WALEdit();
- key = new HLogKey();
- }
-
- /**
- * Constructor for both params
- *
- * @param edit log's edit
- * @param key log's key
- */
- public Entry(HLogKey key, WALEdit edit) {
- this.key = key;
- this.edit = edit;
- }
-
- /**
- * Gets the edit
- *
- * @return edit
- */
- public WALEdit getEdit() {
- return edit;
- }
-
- /**
- * Gets the key
- *
- * @return key
- */
- public HLogKey getKey() {
- return key;
- }
-
- /**
- * Set compression context for this entry.
- *
- * @param compressionContext Compression context
- */
- public void setCompressionContext(CompressionContext compressionContext) {
- edit.setCompressionContext(compressionContext);
- key.setCompressionContext(compressionContext);
- }
-
- @Override
- public String toString() {
- return this.key + "=" + this.edit;
- }
-
- @Override
- @SuppressWarnings("deprecation")
- public void write(DataOutput dataOutput) throws IOException {
- this.key.write(dataOutput);
- this.edit.write(dataOutput);
- }
-
- @Override
- public void readFields(DataInput dataInput) throws IOException {
- this.key.readFields(dataInput);
- this.edit.readFields(dataInput);
- }
- }
-
- /**
- * Registers WALActionsListener
- *
- * @param listener
- */
- void registerWALActionsListener(final WALActionsListener listener);
-
- /**
- * Unregisters WALActionsListener
- *
- * @param listener
- */
- boolean unregisterWALActionsListener(final WALActionsListener listener);
-
- /**
- * @return Current state of the monotonically increasing file id.
- */
- // TODO: Remove. Implementation detail.
- long getFilenum();
-
- /**
- * @return the number of HLog files
- */
- int getNumLogFiles();
-
- /**
- * @return the size of HLog files
- */
- long getLogFileSize();
-
- // TODO: Log rolling should not be in this interface.
- /**
- * Roll the log writer. That is, start writing log messages to a new file.
- *
- * <p>
- * The implementation is synchronized in order to make sure there's one rollWriter
- * running at any given time.
- *
- * @return If lots of logs, flush the returned regions so next time through we
- * can clean logs. Returns null if nothing to flush. Names are actual
- * region names as returned by {@link HRegionInfo#getEncodedName()}
- * @throws org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException
- * @throws IOException
- */
- byte[][] rollWriter() throws FailedLogCloseException, IOException;
-
- /**
- * Roll the log writer. That is, start writing log messages to a new file.
- *
- * <p>
- * The implementation is synchronized in order to make sure there's one rollWriter
- * running at any given time.
- *
- * @param force
- * If true, force creation of a new writer even if no entries have
- * been written to the current writer
- * @return If lots of logs, flush the returned regions so next time through we
- * can clean logs. Returns null if nothing to flush. Names are actual
- * region names as returned by {@link HRegionInfo#getEncodedName()}
- * @throws org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException
- * @throws IOException
- */
- byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException;
-
- /**
- * Shut down the log.
- *
- * @throws IOException
- */
- void close() throws IOException;
-
- /**
- * Shut down the log and delete the log directory.
- * Used by tests only and in rare cases where we need a log just temporarily while bootstrapping
- * a region or running migrations.
- *
- * @throws IOException
- */
- void closeAndDelete() throws IOException;
-
- /**
- * Same as {@link #appendNoSync(HRegionInfo, TableName, WALEdit, List, long, HTableDescriptor,
- * AtomicLong, boolean, long, long)}
- * except it causes a sync on the log
- * @param info
- * @param tableName
- * @param edits
- * @param now
- * @param htd
- * @param sequenceId
- * @throws IOException
- * @deprecated For tests only and even then, should use
- * {@link #appendNoSync(HTableDescriptor, HRegionInfo, HLogKey, WALEdit, AtomicLong, boolean,
- * List)} and {@link #sync()} instead.
- */
- @Deprecated
- @VisibleForTesting
- public void append(HRegionInfo info, TableName tableName, WALEdit edits,
- final long now, HTableDescriptor htd, AtomicLong sequenceId) throws IOException;
-
- /**
- * For notification post append to the writer. Used by metrics system at least.
- * @param entry
- * @param elapsedTime
- * @return Size of this append.
- */
- long postAppend(final Entry entry, final long elapsedTime);
-
- /**
- * For notification post writer sync. Used by metrics system at least.
- * @param timeInMillis How long the filesystem sync took in milliseconds.
- * @param handlerSyncs How many sync handler calls were released by this call to filesystem
- * sync.
- */
- void postSync(final long timeInMillis, final int handlerSyncs);
-
- /**
- * Append a set of edits to the WAL. WAL edits are keyed by (encoded) regionName, rowname, and
- * log-sequence-id. The WAL is not flushed/sync'd after this transaction completes BUT on return
- * this edit must have its region edit/sequence id assigned else it messes up our unification
- * of mvcc and sequenceid.
- * @param info
- * @param tableName
- * @param edits
- * @param clusterIds
- * @param now
- * @param htd
- * @param sequenceId A reference to the atomic long the <code>info</code> region is using as
- * source of its incrementing edits sequence id. Inside in this call we will increment it and
- * attach the sequence to the edit we apply the WAL.
- * @param isInMemstore Always true except for case where we are writing a compaction completion
- * record into the WAL; in this case the entry is just so we can finish an unfinished compaction
- * -- it is not an edit for memstore.
- * @param nonceGroup
- * @param nonce
- * @return Returns a 'transaction id'. Do not use. This is an internal implementation detail and
- * cannot be respected in all implementations; i.e. the append/sync machine may or may not be
- * able to sync an explicit edit only (the current default implementation syncs up to the time
- * of the sync call syncing whatever is behind the sync).
- * @throws IOException
- * @deprecated Use {@link #appendNoSync(HTableDescriptor, HRegionInfo, HLogKey, WALEdit, AtomicLong, boolean, List)}
- * instead because you can get back the region edit/sequenceid; it is set into the passed in
- * <code>key</code>.
- */
- @Deprecated
- long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits,
- List<UUID> clusterIds, final long now, HTableDescriptor htd, AtomicLong sequenceId,
- boolean isInMemstore, long nonceGroup, long nonce) throws IOException;
-
- /**
- * Append a set of edits to the WAL. The WAL is not flushed/sync'd after this transaction
- * completes BUT on return this edit must have its region edit/sequence id assigned
- * else it messes up our unification of mvcc and sequenceid. On return <code>key</code> will
- * have the region edit/sequence id filled in.
- * @param info
- * @param key Modified by this call; we add to it this edits region edit/sequence id.
- * @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit
- * sequence id that is after all currently appended edits.
- * @param htd
- * @param sequenceId A reference to the atomic long the <code>info</code> region is using as
- * source of its incrementing edits sequence id. Inside in this call we will increment it and
- * attach the sequence to the edit we apply the WAL.
- * @param inMemstore Always true except for case where we are writing a compaction completion
- * record into the WAL; in this case the entry is just so we can finish an unfinished compaction
- * -- it is not an edit for memstore.
- * @param memstoreCells list of Cells added into memstore
- * @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
- * in it.
- * @throws IOException
- */
- long appendNoSync(HTableDescriptor htd, HRegionInfo info, HLogKey key, WALEdit edits,
- AtomicLong sequenceId, boolean inMemstore, List<Cell> memstoreCells) throws IOException;
-
- // TODO: Do we need all these versions of sync?
- void hsync() throws IOException;
-
- void hflush() throws IOException;
-
- /**
- * Sync what we have in the WAL.
- * @throws IOException
- */
- void sync() throws IOException;
-
- /**
- * Sync the WAL if the txId was not already sync'd.
- * @param txid Transaction id to sync to.
- * @throws IOException
- */
- void sync(long txid) throws IOException;
-
- /**
- * WAL keeps track of the sequence numbers that were not yet flushed from memstores
- * in order to be able to do cleanup. This method tells WAL that some region is about
- * to flush memstore.
- *
- * <p>We stash the oldest seqNum for the region, and let the the next edit inserted in this
- * region be recorded in {@link #append(HRegionInfo, TableName, WALEdit, long, HTableDescriptor,
- * AtomicLong)} as new oldest seqnum.
- * In case of flush being aborted, we put the stashed value back; in case of flush succeeding,
- * the seqNum of that first edit after start becomes the valid oldest seqNum for this region.
- *
- * @return true if the flush can proceed, false in case wal is closing (ususally, when server is
- * closing) and flush couldn't be started.
- */
- boolean startCacheFlush(final byte[] encodedRegionName);
-
- /**
- * Complete the cache flush.
- * @param encodedRegionName Encoded region name.
- */
- void completeCacheFlush(final byte[] encodedRegionName);
-
- /**
- * Abort a cache flush. Call if the flush fails. Note that the only recovery
- * for an aborted flush currently is a restart of the regionserver so the
- * snapshot content dropped by the failure gets restored to the memstore.v
- * @param encodedRegionName Encoded region name.
- */
- void abortCacheFlush(byte[] encodedRegionName);
-
- /**
- * @return Coprocessor host.
- */
- WALCoprocessorHost getCoprocessorHost();
-
- /**
- * Get LowReplication-Roller status
- *
- * @return lowReplicationRollEnabled
- */
- // TODO: This is implementation detail?
- boolean isLowReplicationRollEnabled();
-
- /** Gets the earliest sequence number in the memstore for this particular region.
- * This can serve as best-effort "recent" WAL number for this region.
- * @param encodedRegionName The region to get the number for.
- * @return The number if present, HConstants.NO_SEQNUM if absent.
- */
- long getEarliestMemstoreSeqNum(byte[] encodedRegionName);
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java
deleted file mode 100644
index a54091d..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.hadoop.hbase.regionserver.wal;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.io.InterruptedIOException;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
-import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer;
-import org.apache.hadoop.hbase.util.CancelableProgressable;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-
-@InterfaceAudience.Private
-public class HLogFactory {
- private static final Log LOG = LogFactory.getLog(HLogFactory.class);
-
- public static HLog createHLog(final FileSystem fs, final Path root, final String logName,
- final Configuration conf) throws IOException {
- return new FSHLog(fs, root, logName, conf);
- }
-
- public static HLog createHLog(final FileSystem fs, final Path root, final String logName,
- final String oldLogName, final Configuration conf) throws IOException {
- return new FSHLog(fs, root, logName, oldLogName, conf, null, true, null, false);
-}
-
- public static HLog createHLog(final FileSystem fs, final Path root, final String logName,
- final Configuration conf, final List<WALActionsListener> listeners,
- final String prefix) throws IOException {
- return new FSHLog(fs, root, logName, HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners,
- true, prefix, false);
- }
-
- public static HLog createMetaHLog(final FileSystem fs, final Path root, final String logName,
- final Configuration conf, final List<WALActionsListener> listeners,
- final String prefix) throws IOException {
- return new FSHLog(fs, root, logName, HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners,
- false, prefix, true);
- }
-
- /*
- * WAL Reader
- */
- private static Class<? extends Reader> logReaderClass;
-
- static void resetLogReaderClass() {
- logReaderClass = null;
- }
-
- public static HLog.Reader createReader(final FileSystem fs,
- final Path path, Configuration conf) throws IOException {
- return createReader(fs, path, conf, null);
- }
-
- /**
- * Create a reader for the WAL. If you are reading from a file that's being written to
- * and need to reopen it multiple times, use {@link HLog.Reader#reset()} instead of this method
- * then just seek back to the last known good position.
- * @return A WAL reader. Close when done with it.
- * @throws IOException
- */
- public static HLog.Reader createReader(final FileSystem fs, final Path path,
- Configuration conf, CancelableProgressable reporter) throws IOException {
- return createReader(fs, path, conf, reporter, true);
- }
-
- public static HLog.Reader createReader(final FileSystem fs, final Path path,
- Configuration conf, CancelableProgressable reporter, boolean allowCustom)
- throws IOException {
- if (allowCustom && (logReaderClass == null)) {
- logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
- ProtobufLogReader.class, Reader.class);
- }
- Class<? extends Reader> lrClass = allowCustom ? logReaderClass : ProtobufLogReader.class;
-
- try {
- // A hlog file could be under recovery, so it may take several
- // tries to get it open. Instead of claiming it is corrupted, retry
- // to open it up to 5 minutes by default.
- long startWaiting = EnvironmentEdgeManager.currentTime();
- long openTimeout = conf.getInt("hbase.hlog.open.timeout", 300000) + startWaiting;
- int nbAttempt = 0;
- while (true) {
- try {
- if (lrClass != ProtobufLogReader.class) {
- // User is overriding the WAL reader, let them.
- HLog.Reader reader = lrClass.newInstance();
- reader.init(fs, path, conf, null);
- return reader;
- } else {
- FSDataInputStream stream = fs.open(path);
- // Note that zero-length file will fail to read PB magic, and attempt to create
- // a non-PB reader and fail the same way existing code expects it to. If we get
- // rid of the old reader entirely, we need to handle 0-size files differently from
- // merely non-PB files.
- byte[] magic = new byte[ProtobufLogReader.PB_WAL_MAGIC.length];
- boolean isPbWal = (stream.read(magic) == magic.length)
- && Arrays.equals(magic, ProtobufLogReader.PB_WAL_MAGIC);
- HLog.Reader reader =
- isPbWal ? new ProtobufLogReader() : new SequenceFileLogReader();
- reader.init(fs, path, conf, stream);
- return reader;
- }
- } catch (IOException e) {
- String msg = e.getMessage();
- if (msg != null && (msg.contains("Cannot obtain block length")
- || msg.contains("Could not obtain the last block")
- || msg.matches("Blocklist for [^ ]* has changed.*"))) {
- if (++nbAttempt == 1) {
- LOG.warn("Lease should have recovered. This is not expected. Will retry", e);
- }
- if (reporter != null && !reporter.progress()) {
- throw new InterruptedIOException("Operation is cancelled");
- }
- if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) {
- LOG.error("Can't open after " + nbAttempt + " attempts and "
- + (EnvironmentEdgeManager.currentTime() - startWaiting)
- + "ms " + " for " + path);
- } else {
- try {
- Thread.sleep(nbAttempt < 3 ? 500 : 1000);
- continue; // retry
- } catch (InterruptedException ie) {
- InterruptedIOException iioe = new InterruptedIOException();
- iioe.initCause(ie);
- throw iioe;
- }
- }
- }
- throw e;
- }
- }
- } catch (IOException ie) {
- throw ie;
- } catch (Exception e) {
- throw new IOException("Cannot get log reader", e);
- }
- }
-
- /*
- * WAL writer
- */
- private static Class<? extends Writer> logWriterClass;
-
- static void resetLogWriterClass() {
- logWriterClass = null;
- }
-
- /**
- * Create a writer for the WAL.
- * @return A WAL writer. Close when done with it.
- * @throws IOException
- */
- public static HLog.Writer createWALWriter(final FileSystem fs,
- final Path path, Configuration conf) throws IOException {
- return createWriter(fs, path, conf, false);
- }
-
- public static HLog.Writer createRecoveredEditsWriter(final FileSystem fs,
- final Path path, Configuration conf) throws IOException {
- return createWriter(fs, path, conf, true);
- }
-
- private static HLog.Writer createWriter(final FileSystem fs,
- final Path path, Configuration conf, boolean overwritable)
- throws IOException {
- try {
- if (logWriterClass == null) {
- logWriterClass = conf.getClass("hbase.regionserver.hlog.writer.impl",
- ProtobufLogWriter.class, Writer.class);
- }
- HLog.Writer writer = (HLog.Writer)logWriterClass.newInstance();
- writer.init(fs, path, conf, overwritable);
- return writer;
- } catch (Exception e) {
- throw new IOException("cannot get log writer", e);
- }
- }
-}