You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2012/10/02 21:29:21 UTC
svn commit: r1393126 [2/4] - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/backup/example/
main/java/org/apache/hadoop/hbase/fs/
main/java/org/apache/hadoop/hbase/mapreduce/
main/java/org/apache/hadoop/hbase/master/ main/java/org/...
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1393126&r1=1393125&r2=1393126&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Tue Oct 2 19:29:19 2012
@@ -16,107 +16,37 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.DataInput;
import java.io.DataOutput;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
-import java.io.UnsupportedEncodingException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.URLEncoder;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
import java.util.NavigableSet;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.TreeSet;
import java.util.UUID;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.fs.Syncable;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.regionserver.wal.HLogMetrics.Metric;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ClassSize;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.HasThread;
-import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+
-/**
- * HLog stores all the edits to the HStore. Its the hbase write-ahead-log
- * implementation.
- *
- * It performs logfile-rolling, so external callers are not aware that the
- * underlying file is being rolled.
- *
- * <p>
- * There is one HLog per RegionServer. All edits for all Regions carried by
- * a particular RegionServer are entered first in the HLog.
- *
- * <p>
- * Each HRegion is identified by a unique long <code>int</code>. HRegions do
- * not need to declare themselves before using the HLog; they simply include
- * their HRegion-id in the <code>append</code> or
- * <code>completeCacheFlush</code> calls.
- *
- * <p>
- * An HLog consists of multiple on-disk files, which have a chronological order.
- * As data is flushed to other (better) on-disk structures, the log becomes
- * obsolete. We can destroy all the log messages for a given HRegion-id up to
- * the most-recent CACHEFLUSH message from that HRegion.
- *
- * <p>
- * It's only practical to delete entire files. Thus, we delete an entire on-disk
- * file F when all of the messages in F have a log-sequence-id that's older
- * (smaller) than the most-recent CACHEFLUSH message for every HRegion that has
- * a message in F.
- *
- * <p>
- * Synchronized methods can never execute in parallel. However, between the
- * start of a cache flush and the completion point, appends are allowed but log
- * rolling is not. To prevent log rolling taking place during this period, a
- * separate reentrant lock is used.
- *
- * <p>To read an HLog, call {@link #getReader(org.apache.hadoop.fs.FileSystem,
- * org.apache.hadoop.fs.Path, org.apache.hadoop.conf.Configuration)}.
- *
- */
@InterfaceAudience.Private
-public class HLog implements Syncable {
- static final Log LOG = LogFactory.getLog(HLog.class);
- public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY");
- static final byte [] METAROW = Bytes.toBytes("METAROW");
+public interface HLog {
+ public static final Log LOG = LogFactory.getLog(HLog.class);
+
+ public static final byte[] METAFAMILY = Bytes.toBytes("METAFAMILY");
+ static final byte[] METAROW = Bytes.toBytes("METAROW");
/** File Extension used while splitting an HLog into regions (HBASE-2312) */
public static final String SPLITTING_EXT = "-splitting";
@@ -126,956 +56,226 @@ public class HLog implements Syncable {
* Name of directory that holds recovered edits written by the wal log
* splitting code, one per region
*/
- private static final String RECOVERED_EDITS_DIR = "recovered.edits";
- private static final Pattern EDITFILES_NAME_PATTERN =
- Pattern.compile("-?[0-9]+");
+ static final String RECOVERED_EDITS_DIR = "recovered.edits";
+ static final Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+");
static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp";
-
- private final FileSystem fs;
- private final Path dir;
- private final Configuration conf;
- // Listeners that are called on WAL events.
- private List<WALActionsListener> listeners =
- new CopyOnWriteArrayList<WALActionsListener>();
- private final long optionalFlushInterval;
- private final long blocksize;
- private final String prefix;
- private final AtomicLong unflushedEntries = new AtomicLong(0);
- private volatile long syncedTillHere = 0;
- private long lastDeferredTxid;
- private final Path oldLogDir;
- private volatile boolean logRollRunning;
-
- private static Class<? extends Writer> logWriterClass;
- private static Class<? extends Reader> logReaderClass;
-
- private WALCoprocessorHost coprocessorHost;
-
-
- static void resetLogReaderClass() {
- HLog.logReaderClass = null;
- }
-
- private FSDataOutputStream hdfs_out; // FSDataOutputStream associated with the current SequenceFile.writer
- // Minimum tolerable replicas, if the actual value is lower than it,
- // rollWriter will be triggered
- private int minTolerableReplication;
- private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas
- final static Object [] NO_ARGS = new Object []{};
public interface Reader {
void init(FileSystem fs, Path path, Configuration c) throws IOException;
+
void close() throws IOException;
+
Entry next() throws IOException;
+
Entry next(Entry reuse) throws IOException;
+
void seek(long pos) throws IOException;
+
long getPosition() throws IOException;
}
public interface Writer {
void init(FileSystem fs, Path path, Configuration c) throws IOException;
+
void close() throws IOException;
+
void sync() throws IOException;
+
void append(Entry entry) throws IOException;
+
long getLength() throws IOException;
}
- /*
- * Current log file.
- */
- Writer writer;
-
- /*
- * Map of all log files but the current one.
- */
- final SortedMap<Long, Path> outputfiles =
- Collections.synchronizedSortedMap(new TreeMap<Long, Path>());
-
- /*
- * Map of encoded region names to their most recent sequence/edit id in their
- * memstore.
- */
- private final ConcurrentSkipListMap<byte [], Long> lastSeqWritten =
- new ConcurrentSkipListMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
-
- private volatile boolean closed = false;
-
- private final AtomicLong logSeqNum = new AtomicLong(0);
-
- // The timestamp (in ms) when the log file was created.
- private volatile long filenum = -1;
-
- //number of transactions in the current Hlog.
- private final AtomicInteger numEntries = new AtomicInteger(0);
-
- // If live datanode count is lower than the default replicas value,
- // RollWriter will be triggered in each sync(So the RollWriter will be
- // triggered one by one in a short time). Using it as a workaround to slow
- // down the roll frequency triggered by checkLowReplication().
- private volatile int consecutiveLogRolls = 0;
- private final int lowReplicationRollLimit;
-
- // If consecutiveLogRolls is larger than lowReplicationRollLimit,
- // then disable the rolling in checkLowReplication().
- // Enable it if the replications recover.
- private volatile boolean lowReplicationRollEnabled = true;
-
- // If > than this size, roll the log. This is typically 0.95 times the size
- // of the default Hdfs block size.
- private final long logrollsize;
-
- // This lock prevents starting a log roll during a cache flush.
- // synchronized is insufficient because a cache flush spans two method calls.
- private final Lock cacheFlushLock = new ReentrantLock();
-
- // We synchronize on updateLock to prevent updates and to prevent a log roll
- // during an update
- // locked during appends
- private final Object updateLock = new Object();
- private final Object flushLock = new Object();
-
- private final boolean enabled;
-
- /*
- * If more than this many logs, force flush of oldest region to oldest edit
- * goes to disk. If too many and we crash, then will take forever replaying.
- * Keep the number of logs tidy.
- */
- private final int maxLogs;
-
/**
- * Thread that handles optional sync'ing
+ * Utility class that lets us keep track of the edit with it's key Only used
+ * when splitting logs
*/
- private final LogSyncer logSyncerThread;
-
- /** Number of log close errors tolerated before we abort */
- private final int closeErrorsTolerated;
-
- private final AtomicInteger closeErrorCount = new AtomicInteger();
-
- /**
- * Pattern used to validate a HLog file name
- */
- private static final Pattern pattern = Pattern.compile(".*\\.\\d*");
+ public static class Entry implements Writable {
+ private WALEdit edit;
+ private HLogKey key;
- static byte [] COMPLETE_CACHE_FLUSH;
- static {
- try {
- COMPLETE_CACHE_FLUSH =
- "HBASE::CACHEFLUSH".getBytes(HConstants.UTF8_ENCODING);
- } catch (UnsupportedEncodingException e) {
- assert(false);
+ public Entry() {
+ edit = new WALEdit();
+ key = new HLogKey();
}
- }
- public static class Metric {
- public long min = Long.MAX_VALUE;
- public long max = 0;
- public long total = 0;
- public int count = 0;
-
- synchronized void inc(final long val) {
- min = Math.min(min, val);
- max = Math.max(max, val);
- total += val;
- ++count;
+ /**
+ * Constructor for both params
+ *
+ * @param edit
+ * log's edit
+ * @param key
+ * log's key
+ */
+ public Entry(HLogKey key, WALEdit edit) {
+ super();
+ this.key = key;
+ this.edit = edit;
}
- synchronized Metric get() {
- Metric copy = new Metric();
- copy.min = min;
- copy.max = max;
- copy.total = total;
- copy.count = count;
- this.min = Long.MAX_VALUE;
- this.max = 0;
- this.total = 0;
- this.count = 0;
- return copy;
+ /**
+ * Gets the edit
+ *
+ * @return edit
+ */
+ public WALEdit getEdit() {
+ return edit;
}
- }
-
- // For measuring latency of writes
- private static Metric writeTime = new Metric();
- private static Metric writeSize = new Metric();
- // For measuring latency of syncs
- private static Metric syncTime = new Metric();
- //For measuring slow HLog appends
- private static AtomicLong slowHLogAppendCount = new AtomicLong();
- private static Metric slowHLogAppendTime = new Metric();
-
- public static Metric getWriteTime() {
- return writeTime.get();
- }
-
- public static Metric getWriteSize() {
- return writeSize.get();
- }
-
- public static Metric getSyncTime() {
- return syncTime.get();
- }
-
- public static long getSlowAppendCount() {
- return slowHLogAppendCount.get();
- }
-
- public static Metric getSlowAppendTime() {
- return slowHLogAppendTime.get();
- }
-
- /**
- * Constructor.
- *
- * @param fs filesystem handle
- * @param dir path to where hlogs are stored
- * @param oldLogDir path to where hlogs are archived
- * @param conf configuration to use
- * @throws IOException
- */
- public HLog(final FileSystem fs, final Path dir, final Path oldLogDir,
- final Configuration conf)
- throws IOException {
- this(fs, dir, oldLogDir, conf, null, true, null);
- }
-
- /**
- * Create an edit log at the given <code>dir</code> location.
- *
- * You should never have to load an existing log. If there is a log at
- * startup, it should have already been processed and deleted by the time the
- * HLog object is started up.
- *
- * @param fs filesystem handle
- * @param dir path to where hlogs are stored
- * @param oldLogDir path to where hlogs are archived
- * @param conf configuration to use
- * @param listeners Listeners on WAL events. Listeners passed here will
- * be registered before we do anything else; e.g. the
- * Constructor {@link #rollWriter()}.
- * @param prefix should always be hostname and port in distributed env and
- * it will be URL encoded before being used.
- * If prefix is null, "hlog" will be used
- * @throws IOException
- */
- public HLog(final FileSystem fs, final Path dir, final Path oldLogDir,
- final Configuration conf, final List<WALActionsListener> listeners,
- final String prefix) throws IOException {
- this(fs, dir, oldLogDir, conf, listeners, true, prefix);
- }
- /**
- * Create an edit log at the given <code>dir</code> location.
- *
- * You should never have to load an existing log. If there is a log at
- * startup, it should have already been processed and deleted by the time the
- * HLog object is started up.
- *
- * @param fs filesystem handle
- * @param dir path to where hlogs are stored
- * @param oldLogDir path to where hlogs are archived
- * @param conf configuration to use
- * @param listeners Listeners on WAL events. Listeners passed here will
- * be registered before we do anything else; e.g. the
- * Constructor {@link #rollWriter()}.
- * @param failIfLogDirExists If true IOException will be thrown if dir already exists.
- * @param prefix should always be hostname and port in distributed env and
- * it will be URL encoded before being used.
- * If prefix is null, "hlog" will be used
- * @throws IOException
- */
- public HLog(final FileSystem fs, final Path dir, final Path oldLogDir,
- final Configuration conf, final List<WALActionsListener> listeners,
- final boolean failIfLogDirExists, final String prefix)
- throws IOException {
- super();
- this.fs = fs;
- this.dir = dir;
- this.conf = conf;
- if (listeners != null) {
- for (WALActionsListener i: listeners) {
- registerWALActionsListener(i);
- }
- }
- this.blocksize = conf.getLong("hbase.regionserver.hlog.blocksize",
- getDefaultBlockSize());
- // Roll at 95% of block size.
- float multi = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f);
- this.logrollsize = (long)(this.blocksize * multi);
- this.optionalFlushInterval =
- conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000);
- if (failIfLogDirExists && fs.exists(dir)) {
- throw new IOException("Target HLog directory already exists: " + dir);
- }
- if (!fs.mkdirs(dir)) {
- throw new IOException("Unable to mkdir " + dir);
- }
- this.oldLogDir = oldLogDir;
- if (!fs.exists(oldLogDir)) {
- if (!fs.mkdirs(this.oldLogDir)) {
- throw new IOException("Unable to mkdir " + this.oldLogDir);
- }
+ /**
+ * Gets the key
+ *
+ * @return key
+ */
+ public HLogKey getKey() {
+ return key;
}
- this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32);
- this.minTolerableReplication = conf.getInt(
- "hbase.regionserver.hlog.tolerable.lowreplication",
- this.fs.getDefaultReplication());
- this.lowReplicationRollLimit = conf.getInt(
- "hbase.regionserver.hlog.lowreplication.rolllimit", 5);
- this.enabled = conf.getBoolean("hbase.regionserver.hlog.enabled", true);
- this.closeErrorsTolerated = conf.getInt(
- "hbase.regionserver.logroll.errors.tolerated", 0);
-
- LOG.info("HLog configuration: blocksize=" +
- StringUtils.byteDesc(this.blocksize) +
- ", rollsize=" + StringUtils.byteDesc(this.logrollsize) +
- ", enabled=" + this.enabled +
- ", optionallogflushinternal=" + this.optionalFlushInterval + "ms");
- // If prefix is null||empty then just name it hlog
- this.prefix = prefix == null || prefix.isEmpty() ?
- "hlog" : URLEncoder.encode(prefix, "UTF8");
- // rollWriter sets this.hdfs_out if it can.
- rollWriter();
-
- // handle the reflection necessary to call getNumCurrentReplicas()
- this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out);
-
- logSyncerThread = new LogSyncer(this.optionalFlushInterval);
- Threads.setDaemonThreadRunning(logSyncerThread.getThread(),
- Thread.currentThread().getName() + ".logSyncer");
- coprocessorHost = new WALCoprocessorHost(this, conf);
- }
-
- // use reflection to search for getDefaultBlockSize(Path f)
- // if the method doesn't exist, fall back to using getDefaultBlockSize()
- private long getDefaultBlockSize() throws IOException {
- Method m = null;
- Class<? extends FileSystem> cls = this.fs.getClass();
- try {
- m = cls.getMethod("getDefaultBlockSize",
- new Class<?>[] { Path.class });
- } catch (NoSuchMethodException e) {
- LOG.info("FileSystem doesn't support getDefaultBlockSize");
- } catch (SecurityException e) {
- LOG.info("Doesn't have access to getDefaultBlockSize on "
- + "FileSystems", e);
- m = null; // could happen on setAccessible()
+
+ /**
+ * Set compression context for this entry.
+ *
+ * @param compressionContext
+ * Compression context
+ */
+ public void setCompressionContext(CompressionContext compressionContext) {
+ edit.setCompressionContext(compressionContext);
+ key.setCompressionContext(compressionContext);
}
- if (null == m) {
- return this.fs.getDefaultBlockSize();
- } else {
- try {
- Object ret = m.invoke(this.fs, this.dir);
- return ((Long)ret).longValue();
- } catch (Exception e) {
- throw new IOException(e);
- }
+
+ @Override
+ public String toString() {
+ return this.key + "=" + this.edit;
}
- }
- /**
- * Find the 'getNumCurrentReplicas' on the passed <code>os</code> stream.
- * @return Method or null.
- */
- private Method getGetNumCurrentReplicas(final FSDataOutputStream os) {
- Method m = null;
- if (os != null) {
- Class<? extends OutputStream> wrappedStreamClass = os.getWrappedStream()
- .getClass();
- try {
- m = wrappedStreamClass.getDeclaredMethod("getNumCurrentReplicas",
- new Class<?>[] {});
- m.setAccessible(true);
- } catch (NoSuchMethodException e) {
- LOG.info("FileSystem's output stream doesn't support"
- + " getNumCurrentReplicas; --HDFS-826 not available; fsOut="
- + wrappedStreamClass.getName());
- } catch (SecurityException e) {
- LOG.info("Doesn't have access to getNumCurrentReplicas on "
- + "FileSystems's output stream --HDFS-826 not available; fsOut="
- + wrappedStreamClass.getName(), e);
- m = null; // could happen on setAccessible()
- }
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {
+ this.key.write(dataOutput);
+ this.edit.write(dataOutput);
}
- if (m != null) {
- LOG.info("Using getNumCurrentReplicas--HDFS-826");
+
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {
+ this.key.readFields(dataInput);
+ this.edit.readFields(dataInput);
}
- return m;
}
- public void registerWALActionsListener(final WALActionsListener listener) {
- this.listeners.add(listener);
- }
+ /*
+ * registers WALActionsListener
+ *
+ * @param listener
+ */
+ public void registerWALActionsListener(final WALActionsListener listener);
- public boolean unregisterWALActionsListener(final WALActionsListener listener) {
- return this.listeners.remove(listener);
- }
+ /*
+ * unregisters WALActionsListener
+ *
+ * @param listener
+ */
+ public boolean unregisterWALActionsListener(final WALActionsListener listener);
/**
* @return Current state of the monotonically increasing file id.
*/
- public long getFilenum() {
- return this.filenum;
- }
+ public long getFilenum();
/**
* Called by HRegionServer when it opens a new region to ensure that log
* sequence numbers are always greater than the latest sequence number of the
* region being brought on-line.
- *
- * @param newvalue We'll set log edit/sequence number to this value if it
- * is greater than the current value.
- */
- public void setSequenceNumber(final long newvalue) {
- for (long id = this.logSeqNum.get(); id < newvalue &&
- !this.logSeqNum.compareAndSet(id, newvalue); id = this.logSeqNum.get()) {
- // This could spin on occasion but better the occasional spin than locking
- // every increment of sequence number.
- LOG.debug("Changed sequenceid from " + logSeqNum + " to " + newvalue);
- }
- }
+ *
+ * @param newvalue
+ * We'll set log edit/sequence number to this value if it is greater
+ * than the current value.
+ */
+ public void setSequenceNumber(final long newvalue);
/**
* @return log sequence number
*/
- public long getSequenceNumber() {
- return logSeqNum.get();
- }
-
- /**
- * Method used internal to this class and for tests only.
- * @return The wrapped stream our writer is using; its not the
- * writer's 'out' FSDatoOutputStream but the stream that this 'out' wraps
- * (In hdfs its an instance of DFSDataOutputStream).
- */
- // usage: see TestLogRolling.java
- OutputStream getOutputStream() {
- return this.hdfs_out.getWrappedStream();
- }
+ public long getSequenceNumber();
/**
* Roll the log writer. That is, start writing log messages to a new file.
- *
+ *
* Because a log cannot be rolled during a cache flush, and a cache flush
* spans two method calls, a special lock needs to be obtained so that a cache
* flush cannot start when the log is being rolled and the log cannot be
* rolled during a cache flush.
- *
- * <p>Note that this method cannot be synchronized because it is possible that
+ *
+ * <p>
+ * Note that this method cannot be synchronized because it is possible that
* startCacheFlush runs, obtaining the cacheFlushLock, then this method could
* start which would obtain the lock on this but block on obtaining the
* cacheFlushLock and then completeCacheFlush could be called which would wait
* for the lock on this and consequently never release the cacheFlushLock
- *
- * @return If lots of logs, flush the returned regions so next time through
- * we can clean logs. Returns null if nothing to flush. Names are actual
- * region names as returned by {@link HRegionInfo#getEncodedName()}
+ *
+ * @return If lots of logs, flush the returned regions so next time through we
+ * can clean logs. Returns null if nothing to flush. Names are actual
+ * region names as returned by {@link HRegionInfo#getEncodedName()}
* @throws org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException
* @throws IOException
*/
- public byte [][] rollWriter() throws FailedLogCloseException, IOException {
- return rollWriter(false);
- }
+ public byte[][] rollWriter() throws FailedLogCloseException, IOException;
/**
* Roll the log writer. That is, start writing log messages to a new file.
- *
+ *
* Because a log cannot be rolled during a cache flush, and a cache flush
* spans two method calls, a special lock needs to be obtained so that a cache
* flush cannot start when the log is being rolled and the log cannot be
* rolled during a cache flush.
- *
- * <p>Note that this method cannot be synchronized because it is possible that
+ *
+ * <p>
+ * Note that this method cannot be synchronized because it is possible that
* startCacheFlush runs, obtaining the cacheFlushLock, then this method could
* start which would obtain the lock on this but block on obtaining the
* cacheFlushLock and then completeCacheFlush could be called which would wait
* for the lock on this and consequently never release the cacheFlushLock
- *
- * @param force If true, force creation of a new writer even if no entries
- * have been written to the current writer
- * @return If lots of logs, flush the returned regions so next time through
- * we can clean logs. Returns null if nothing to flush. Names are actual
- * region names as returned by {@link HRegionInfo#getEncodedName()}
+ *
+ * @param force
+ * If true, force creation of a new writer even if no entries have
+ * been written to the current writer
+ * @return If lots of logs, flush the returned regions so next time through we
+ * can clean logs. Returns null if nothing to flush. Names are actual
+ * region names as returned by {@link HRegionInfo#getEncodedName()}
* @throws org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException
* @throws IOException
*/
- public byte [][] rollWriter(boolean force)
- throws FailedLogCloseException, IOException {
- // Return if nothing to flush.
- if (!force && this.writer != null && this.numEntries.get() <= 0) {
- return null;
- }
- byte [][] regionsToFlush = null;
- this.cacheFlushLock.lock();
- try {
- this.logRollRunning = true;
- if (closed) {
- LOG.debug("HLog closed. Skipping rolling of writer");
- return regionsToFlush;
- }
- // Do all the preparation outside of the updateLock to block
- // as less as possible the incoming writes
- long currentFilenum = this.filenum;
- Path oldPath = null;
- if (currentFilenum > 0) {
- oldPath = computeFilename(currentFilenum);
- }
- this.filenum = System.currentTimeMillis();
- Path newPath = computeFilename();
-
- // Tell our listeners that a new log is about to be created
- if (!this.listeners.isEmpty()) {
- for (WALActionsListener i : this.listeners) {
- i.preLogRoll(oldPath, newPath);
- }
- }
- HLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf);
- // Can we get at the dfsclient outputstream? If an instance of
- // SFLW, it'll have done the necessary reflection to get at the
- // protected field name.
- FSDataOutputStream nextHdfsOut = null;
- if (nextWriter instanceof SequenceFileLogWriter) {
- nextHdfsOut = ((SequenceFileLogWriter)nextWriter).getWriterFSDataOutputStream();
- }
- // Tell our listeners that a new log was created
- if (!this.listeners.isEmpty()) {
- for (WALActionsListener i : this.listeners) {
- i.postLogRoll(oldPath, newPath);
- }
- }
-
- synchronized (updateLock) {
- // Clean up current writer.
- Path oldFile = cleanupCurrentWriter(currentFilenum);
- this.writer = nextWriter;
- this.hdfs_out = nextHdfsOut;
-
- LOG.info((oldFile != null?
- "Roll " + FSUtils.getPath(oldFile) + ", entries=" +
- this.numEntries.get() +
- ", filesize=" +
- this.fs.getFileStatus(oldFile).getLen() + ". ": "") +
- " for " + FSUtils.getPath(newPath));
- this.numEntries.set(0);
- }
- // Can we delete any of the old log files?
- if (this.outputfiles.size() > 0) {
- if (this.lastSeqWritten.isEmpty()) {
- LOG.debug("Last sequenceid written is empty. Deleting all old hlogs");
- // If so, then no new writes have come in since all regions were
- // flushed (and removed from the lastSeqWritten map). Means can
- // remove all but currently open log file.
- for (Map.Entry<Long, Path> e : this.outputfiles.entrySet()) {
- archiveLogFile(e.getValue(), e.getKey());
- }
- this.outputfiles.clear();
- } else {
- regionsToFlush = cleanOldLogs();
- }
- }
- } finally {
- try {
- this.logRollRunning = false;
- } finally {
- this.cacheFlushLock.unlock();
- }
- }
- return regionsToFlush;
- }
+ public byte[][] rollWriter(boolean force) throws FailedLogCloseException,
+ IOException;
/**
- * This method allows subclasses to inject different writers without having to
- * extend other methods like rollWriter().
+ * Shut down the log.
*
- * @param fs
- * @param path
- * @param conf
- * @return Writer instance
- * @throws IOException
- */
- protected Writer createWriterInstance(final FileSystem fs, final Path path,
- final Configuration conf) throws IOException {
- return createWriter(fs, path, conf);
- }
-
- /**
- * Get a reader for the WAL.
- * @param fs
- * @param path
- * @param conf
- * @return A WAL reader. Close when done with it.
- * @throws IOException
- */
- public static Reader getReader(final FileSystem fs,
- final Path path, Configuration conf)
- throws IOException {
- try {
-
- if (logReaderClass == null) {
-
- logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
- SequenceFileLogReader.class, Reader.class);
- }
-
-
- HLog.Reader reader = logReaderClass.newInstance();
- reader.init(fs, path, conf);
- return reader;
- } catch (IOException e) {
- throw e;
- }
- catch (Exception e) {
- throw new IOException("Cannot get log reader", e);
- }
- }
-
- /**
- * Get a writer for the WAL.
- * @param path
- * @param conf
- * @return A WAL writer. Close when done with it.
- * @throws IOException
- */
- public static Writer createWriter(final FileSystem fs,
- final Path path, Configuration conf)
- throws IOException {
- try {
- if (logWriterClass == null) {
- logWriterClass = conf.getClass("hbase.regionserver.hlog.writer.impl",
- SequenceFileLogWriter.class, Writer.class);
- }
- HLog.Writer writer = (HLog.Writer) logWriterClass.newInstance();
- writer.init(fs, path, conf);
- return writer;
- } catch (Exception e) {
- throw new IOException("cannot get log writer", e);
- }
- }
-
- /*
- * Clean up old commit logs.
- * @return If lots of logs, flush the returned region so next time through
- * we can clean logs. Returns null if nothing to flush. Returns array of
- * encoded region names to flush.
- * @throws IOException
- */
- private byte [][] cleanOldLogs() throws IOException {
- Long oldestOutstandingSeqNum = getOldestOutstandingSeqNum();
- // Get the set of all log files whose last sequence number is smaller than
- // the oldest edit's sequence number.
- TreeSet<Long> sequenceNumbers = new TreeSet<Long>(this.outputfiles.headMap(
- oldestOutstandingSeqNum).keySet());
- // Now remove old log files (if any)
- int logsToRemove = sequenceNumbers.size();
- if (logsToRemove > 0) {
- if (LOG.isDebugEnabled()) {
- // Find associated region; helps debugging.
- byte [] oldestRegion = getOldestRegion(oldestOutstandingSeqNum);
- LOG.debug("Found " + logsToRemove + " hlogs to remove" +
- " out of total " + this.outputfiles.size() + ";" +
- " oldest outstanding sequenceid is " + oldestOutstandingSeqNum +
- " from region " + Bytes.toStringBinary(oldestRegion));
- }
- for (Long seq : sequenceNumbers) {
- archiveLogFile(this.outputfiles.remove(seq), seq);
- }
- }
-
- // If too many log files, figure which regions we need to flush.
- // Array is an array of encoded region names.
- byte [][] regions = null;
- int logCount = this.outputfiles.size();
- if (logCount > this.maxLogs && logCount > 0) {
- // This is an array of encoded region names.
- regions = findMemstoresWithEditsEqualOrOlderThan(this.outputfiles.firstKey(),
- this.lastSeqWritten);
- if (regions != null) {
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < regions.length; i++) {
- if (i > 0) sb.append(", ");
- sb.append(Bytes.toStringBinary(regions[i]));
- }
- LOG.info("Too many hlogs: logs=" + logCount + ", maxlogs=" +
- this.maxLogs + "; forcing flush of " + regions.length + " regions(s): " +
- sb.toString());
- }
- }
- return regions;
- }
-
- /**
- * Return regions (memstores) that have edits that are equal or less than
- * the passed <code>oldestWALseqid</code>.
- * @param oldestWALseqid
- * @param regionsToSeqids Encoded region names to sequence ids
- * @return All regions whose seqid is < than <code>oldestWALseqid</code> (Not
- * necessarily in order). Null if no regions found.
- */
- static byte [][] findMemstoresWithEditsEqualOrOlderThan(final long oldestWALseqid,
- final Map<byte [], Long> regionsToSeqids) {
- // This method is static so it can be unit tested the easier.
- List<byte []> regions = null;
- for (Map.Entry<byte [], Long> e: regionsToSeqids.entrySet()) {
- if (e.getValue().longValue() <= oldestWALseqid) {
- if (regions == null) regions = new ArrayList<byte []>();
- // Key is encoded region name.
- regions.add(e.getKey());
- }
- }
- return regions == null?
- null: regions.toArray(new byte [][] {HConstants.EMPTY_BYTE_ARRAY});
- }
-
- /*
- * @return Logs older than this id are safe to remove.
- */
- private Long getOldestOutstandingSeqNum() {
- return Collections.min(this.lastSeqWritten.values());
- }
-
- /**
- * @param oldestOutstandingSeqNum
- * @return (Encoded) name of oldest outstanding region.
- */
- private byte [] getOldestRegion(final Long oldestOutstandingSeqNum) {
- byte [] oldestRegion = null;
- for (Map.Entry<byte [], Long> e: this.lastSeqWritten.entrySet()) {
- if (e.getValue().longValue() == oldestOutstandingSeqNum.longValue()) {
- // Key is encoded region name.
- oldestRegion = e.getKey();
- break;
- }
- }
- return oldestRegion;
- }
-
- /*
- * Cleans up current writer closing and adding to outputfiles.
- * Presumes we're operating inside an updateLock scope.
- * @return Path to current writer or null if none.
* @throws IOException
*/
- Path cleanupCurrentWriter(final long currentfilenum) throws IOException {
- Path oldFile = null;
- if (this.writer != null) {
- // Close the current writer, get a new one.
- try {
- // Wait till all current transactions are written to the hlog.
- // No new transactions can occur because we have the updatelock.
- if (this.unflushedEntries.get() != this.syncedTillHere) {
- LOG.debug("cleanupCurrentWriter " +
- " waiting for transactions to get synced " +
- " total " + this.unflushedEntries.get() +
- " synced till here " + syncedTillHere);
- sync();
- }
- this.writer.close();
- this.writer = null;
- closeErrorCount.set(0);
- } catch (IOException e) {
- LOG.error("Failed close of HLog writer", e);
- int errors = closeErrorCount.incrementAndGet();
- if (errors <= closeErrorsTolerated && !hasDeferredEntries()) {
- LOG.warn("Riding over HLog close failure! error count="+errors);
- } else {
- if (hasDeferredEntries()) {
- LOG.error("Aborting due to unflushed edits in HLog");
- }
- // Failed close of log file. Means we're losing edits. For now,
- // shut ourselves down to minimize loss. Alternative is to try and
- // keep going. See HBASE-930.
- FailedLogCloseException flce =
- new FailedLogCloseException("#" + currentfilenum);
- flce.initCause(e);
- throw flce;
- }
- }
- if (currentfilenum >= 0) {
- oldFile = computeFilename(currentfilenum);
- this.outputfiles.put(Long.valueOf(this.logSeqNum.get()), oldFile);
- }
- }
- return oldFile;
- }
-
- private void archiveLogFile(final Path p, final Long seqno) throws IOException {
- Path newPath = getHLogArchivePath(this.oldLogDir, p);
- LOG.info("moving old hlog file " + FSUtils.getPath(p) +
- " whose highest sequenceid is " + seqno + " to " +
- FSUtils.getPath(newPath));
-
- // Tell our listeners that a log is going to be archived.
- if (!this.listeners.isEmpty()) {
- for (WALActionsListener i : this.listeners) {
- i.preLogArchive(p, newPath);
- }
- }
- if (!this.fs.rename(p, newPath)) {
- throw new IOException("Unable to rename " + p + " to " + newPath);
- }
- // Tell our listeners that a log has been archived.
- if (!this.listeners.isEmpty()) {
- for (WALActionsListener i : this.listeners) {
- i.postLogArchive(p, newPath);
- }
- }
- }
-
- /**
- * This is a convenience method that computes a new filename with a given
- * using the current HLog file-number
- * @return Path
- */
- protected Path computeFilename() {
- return computeFilename(this.filenum);
- }
-
- /**
- * This is a convenience method that computes a new filename with a given
- * file-number.
- * @param filenum to use
- * @return Path
- */
- protected Path computeFilename(long filenum) {
- if (filenum < 0) {
- throw new RuntimeException("hlog file number can't be < 0");
- }
- return new Path(dir, prefix + "." + filenum);
- }
+ public void close() throws IOException;
/**
* Shut down the log and delete the log directory
- *
+ *
* @throws IOException
*/
- public void closeAndDelete() throws IOException {
- close();
- if (!fs.exists(this.dir)) return;
- FileStatus[] files = fs.listStatus(this.dir);
- for(FileStatus file : files) {
-
- Path p = getHLogArchivePath(this.oldLogDir, file.getPath());
- // Tell our listeners that a log is going to be archived.
- if (!this.listeners.isEmpty()) {
- for (WALActionsListener i : this.listeners) {
- i.preLogArchive(file.getPath(), p);
- }
- }
-
- if (!fs.rename(file.getPath(),p)) {
- throw new IOException("Unable to rename " + file.getPath() + " to " + p);
- }
- // Tell our listeners that a log was archived.
- if (!this.listeners.isEmpty()) {
- for (WALActionsListener i : this.listeners) {
- i.postLogArchive(file.getPath(), p);
- }
- }
- }
- LOG.debug("Moved " + files.length + " log files to " +
- FSUtils.getPath(this.oldLogDir));
- if (!fs.delete(dir, true)) {
- LOG.info("Unable to delete " + dir);
- }
- }
+ public void closeAndDelete() throws IOException;
/**
- * Shut down the log.
- *
- * @throws IOException
- */
- public void close() throws IOException {
- try {
- logSyncerThread.close();
- // Make sure we synced everything
- logSyncerThread.join(this.optionalFlushInterval*2);
- } catch (InterruptedException e) {
- LOG.error("Exception while waiting for syncer thread to die", e);
- }
-
- cacheFlushLock.lock();
- try {
- // Tell our listeners that the log is closing
- if (!this.listeners.isEmpty()) {
- for (WALActionsListener i : this.listeners) {
- i.logCloseRequested();
- }
- }
- synchronized (updateLock) {
- this.closed = true;
- if (LOG.isDebugEnabled()) {
- LOG.debug("closing hlog writer in " + this.dir.toString());
- }
- if (this.writer != null) {
- this.writer.close();
- }
- }
- } finally {
- cacheFlushLock.unlock();
- }
- }
-
- /**
- * @param now
- * @param regionName
- * @param tableName
- * @param clusterId
- * @return New log key.
- */
- protected HLogKey makeKey(byte[] regionName, byte[] tableName, long seqnum,
- long now, UUID clusterId) {
- return new HLogKey(regionName, tableName, seqnum, now, clusterId);
- }
-
-
- /** Append an entry to the log.
- *
+ * Append an entry to the log.
+ *
* @param regionInfo
* @param logEdit
* @param logKey
- * @param doSync shall we sync after writing the transaction
+ * @param doSync
+ * shall we sync after writing the transaction
* @return The txid of this transaction
* @throws IOException
*/
public long append(HRegionInfo regionInfo, HLogKey logKey, WALEdit logEdit,
- HTableDescriptor htd, boolean doSync)
- throws IOException {
- if (this.closed) {
- throw new IOException("Cannot append; log is closed");
- }
- long txid = 0;
- synchronized (updateLock) {
- long seqNum = obtainSeqNum();
- logKey.setLogSeqNum(seqNum);
- // The 'lastSeqWritten' map holds the sequence number of the oldest
- // write for each region (i.e. the first edit added to the particular
- // memstore). When the cache is flushed, the entry for the
- // region being flushed is removed if the sequence number of the flush
- // is greater than or equal to the value in lastSeqWritten.
- this.lastSeqWritten.putIfAbsent(regionInfo.getEncodedNameAsBytes(),
- Long.valueOf(seqNum));
- doWrite(regionInfo, logKey, logEdit, htd);
- txid = this.unflushedEntries.incrementAndGet();
- this.numEntries.incrementAndGet();
- if (htd.isDeferredLogFlush()) {
- lastDeferredTxid = txid;
- }
- }
-
- // Sync if catalog region, and if not then check if that table supports
- // deferred log flushing
- if (doSync &&
- (regionInfo.isMetaRegion() ||
- !htd.isDeferredLogFlush())) {
- // sync txn to file system
- this.sync(txid);
- }
- return txid;
- }
+ HTableDescriptor htd, boolean doSync) throws IOException;
/**
* Only used in tests.
- *
+ *
* @param info
* @param tableName
* @param edits
@@ -1083,913 +283,117 @@ public class HLog implements Syncable {
* @param htd
* @throws IOException
*/
- public void append(HRegionInfo info, byte [] tableName, WALEdit edits,
- final long now, HTableDescriptor htd)
- throws IOException {
- append(info, tableName, edits, HConstants.DEFAULT_CLUSTER_ID, now, htd);
- }
-
- /**
- * Append a set of edits to the log. Log edits are keyed by (encoded)
- * regionName, rowname, and log-sequence-id.
- *
- * Later, if we sort by these keys, we obtain all the relevant edits for a
- * given key-range of the HRegion (TODO). Any edits that do not have a
- * matching COMPLETE_CACHEFLUSH message can be discarded.
- *
- * <p>
- * Logs cannot be restarted once closed, or once the HLog process dies. Each
- * time the HLog starts, it must create a new log. This means that other
- * systems should process the log appropriately upon each startup (and prior
- * to initializing HLog).
- *
- * synchronized prevents appends during the completion of a cache flush or for
- * the duration of a log roll.
- *
- * @param info
- * @param tableName
- * @param edits
- * @param clusterId The originating clusterId for this edit (for replication)
- * @param now
- * @param doSync shall we sync?
- * @return txid of this transaction
- * @throws IOException
- */
- private long append(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId,
- final long now, HTableDescriptor htd, boolean doSync)
- throws IOException {
- if (edits.isEmpty()) return this.unflushedEntries.get();;
- if (this.closed) {
- throw new IOException("Cannot append; log is closed");
- }
- long txid = 0;
- synchronized (this.updateLock) {
- long seqNum = obtainSeqNum();
- // The 'lastSeqWritten' map holds the sequence number of the oldest
- // write for each region (i.e. the first edit added to the particular
- // memstore). . When the cache is flushed, the entry for the
- // region being flushed is removed if the sequence number of the flush
- // is greater than or equal to the value in lastSeqWritten.
- // Use encoded name. Its shorter, guaranteed unique and a subset of
- // actual name.
- byte [] encodedRegionName = info.getEncodedNameAsBytes();
- this.lastSeqWritten.putIfAbsent(encodedRegionName, seqNum);
- HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterId);
- doWrite(info, logKey, edits, htd);
- this.numEntries.incrementAndGet();
- txid = this.unflushedEntries.incrementAndGet();
- if (htd.isDeferredLogFlush()) {
- lastDeferredTxid = txid;
- }
- }
- // Sync if catalog region, and if not then check if that table supports
- // deferred log flushing
- if (doSync &&
- (info.isMetaRegion() ||
- !htd.isDeferredLogFlush())) {
- // sync txn to file system
- this.sync(txid);
- }
- return txid;
- }
+ public void append(HRegionInfo info, byte[] tableName, WALEdit edits,
+ final long now, HTableDescriptor htd) throws IOException;
/**
* Append a set of edits to the log. Log edits are keyed by (encoded)
- * regionName, rowname, and log-sequence-id. The HLog is not flushed
- * after this transaction is written to the log.
- *
+ * regionName, rowname, and log-sequence-id. The HLog is not flushed after
+ * this transaction is written to the log.
+ *
* @param info
* @param tableName
* @param edits
- * @param clusterId The originating clusterId for this edit (for replication)
+ * @param clusterId
+ * The originating clusterId for this edit (for replication)
* @param now
* @return txid of this transaction
* @throws IOException
*/
- public long appendNoSync(HRegionInfo info, byte [] tableName, WALEdit edits,
- UUID clusterId, final long now, HTableDescriptor htd)
- throws IOException {
- return append(info, tableName, edits, clusterId, now, htd, false);
- }
+ public long appendNoSync(HRegionInfo info, byte[] tableName, WALEdit edits,
+ UUID clusterId, final long now, HTableDescriptor htd) throws IOException;
/**
* Append a set of edits to the log. Log edits are keyed by (encoded)
- * regionName, rowname, and log-sequence-id. The HLog is flushed
- * after this transaction is written to the log.
- *
+ * regionName, rowname, and log-sequence-id. The HLog is flushed after this
+ * transaction is written to the log.
+ *
* @param info
* @param tableName
* @param edits
- * @param clusterId The originating clusterId for this edit (for replication)
+ * @param clusterId
+ * The originating clusterId for this edit (for replication)
* @param now
+ * @param htd
* @return txid of this transaction
* @throws IOException
*/
- public long append(HRegionInfo info, byte [] tableName, WALEdit edits,
- UUID clusterId, final long now, HTableDescriptor htd)
- throws IOException {
- return append(info, tableName, edits, clusterId, now, htd, true);
- }
+ public long append(HRegionInfo info, byte[] tableName, WALEdit edits,
+ UUID clusterId, final long now, HTableDescriptor htd) throws IOException;
- /**
- * This class is responsible to hold the HLog's appended Entry list
- * and to sync them according to a configurable interval.
- *
- * Deferred log flushing works first by piggy backing on this process by
- * simply not sync'ing the appended Entry. It can also be sync'd by other
- * non-deferred log flushed entries outside of this thread.
- */
- class LogSyncer extends HasThread {
-
- private final long optionalFlushInterval;
-
- private AtomicBoolean closeLogSyncer = new AtomicBoolean(false);
-
- // List of pending writes to the HLog. There corresponds to transactions
- // that have not yet returned to the client. We keep them cached here
- // instead of writing them to HDFS piecemeal, because the HDFS write
- // method is pretty heavyweight as far as locking is concerned. The
- // goal is to increase the batchsize for writing-to-hdfs as well as
- // sync-to-hdfs, so that we can get better system throughput.
- private List<Entry> pendingWrites = new LinkedList<Entry>();
+ public void hsync() throws IOException;
- LogSyncer(long optionalFlushInterval) {
- this.optionalFlushInterval = optionalFlushInterval;
- }
+ public void hflush() throws IOException;
- @Override
- public void run() {
- try {
- // awaiting with a timeout doesn't always
- // throw exceptions on interrupt
- while(!this.isInterrupted() && !closeLogSyncer.get()) {
-
- try {
- if (unflushedEntries.get() <= syncedTillHere) {
- synchronized (closeLogSyncer) {
- closeLogSyncer.wait(this.optionalFlushInterval);
- }
- }
- // Calling sync since we waited or had unflushed entries.
- // Entries appended but not sync'd are taken care of here AKA
- // deferred log flush
- sync();
- } catch (IOException e) {
- LOG.error("Error while syncing, requesting close of hlog ", e);
- requestLogRoll();
- }
- }
- } catch (InterruptedException e) {
- LOG.debug(getName() + " interrupted while waiting for sync requests");
- } finally {
- LOG.info(getName() + " exiting");
- }
- }
+ public void sync() throws IOException;
- // appends new writes to the pendingWrites. It is better to keep it in
- // our own queue rather than writing it to the HDFS output stream because
- // HDFSOutputStream.writeChunk is not lightweight at all.
- synchronized void append(Entry e) throws IOException {
- pendingWrites.add(e);
- }
-
- // Returns all currently pending writes. New writes
- // will accumulate in a new list.
- synchronized List<Entry> getPendingWrites() {
- List<Entry> save = this.pendingWrites;
- this.pendingWrites = new LinkedList<Entry>();
- return save;
- }
-
- // writes out pending entries to the HLog
- void hlogFlush(Writer writer, List<Entry> pending) throws IOException {
- if (pending == null) return;
-
- // write out all accumulated Entries to hdfs.
- for (Entry e : pending) {
- writer.append(e);
- }
- }
-
- void close() {
- synchronized (closeLogSyncer) {
- closeLogSyncer.set(true);
- closeLogSyncer.notifyAll();
- }
- }
- }
-
- // sync all known transactions
- private void syncer() throws IOException {
- syncer(this.unflushedEntries.get()); // sync all pending items
- }
-
- // sync all transactions upto the specified txid
- private void syncer(long txid) throws IOException {
- Writer tempWriter;
- synchronized (this.updateLock) {
- if (this.closed) return;
- tempWriter = this.writer; // guaranteed non-null
- }
- // if the transaction that we are interested in is already
- // synced, then return immediately.
- if (txid <= this.syncedTillHere) {
- return;
- }
- try {
- long doneUpto;
- long now = System.currentTimeMillis();
- // First flush all the pending writes to HDFS. Then
- // issue the sync to HDFS. If sync is successful, then update
- // syncedTillHere to indicate that transactions till this
- // number has been successfully synced.
- synchronized (flushLock) {
- if (txid <= this.syncedTillHere) {
- return;
- }
- doneUpto = this.unflushedEntries.get();
- List<Entry> pending = logSyncerThread.getPendingWrites();
- try {
- logSyncerThread.hlogFlush(tempWriter, pending);
- } catch(IOException io) {
- synchronized (this.updateLock) {
- // HBASE-4387, HBASE-5623, retry with updateLock held
- tempWriter = this.writer;
- logSyncerThread.hlogFlush(tempWriter, pending);
- }
- }
- }
- // another thread might have sync'ed avoid double-sync'ing
- if (txid <= this.syncedTillHere) {
- return;
- }
- try {
- tempWriter.sync();
- } catch(IOException io) {
- synchronized (this.updateLock) {
- // HBASE-4387, HBASE-5623, retry with updateLock held
- tempWriter = this.writer;
- tempWriter.sync();
- }
- }
- this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto);
-
- syncTime.inc(System.currentTimeMillis() - now);
- if (!this.logRollRunning) {
- checkLowReplication();
- try {
- if (tempWriter.getLength() > this.logrollsize) {
- requestLogRoll();
- }
- } catch (IOException x) {
- LOG.debug("Log roll failed and will be retried. (This is not an error)");
- }
- }
- } catch (IOException e) {
- LOG.fatal("Could not sync. Requesting close of hlog", e);
- requestLogRoll();
- throw e;
- }
- }
-
- private void checkLowReplication() {
- // if the number of replicas in HDFS has fallen below the configured
- // value, then roll logs.
- try {
- int numCurrentReplicas = getLogReplication();
- if (numCurrentReplicas != 0
- && numCurrentReplicas < this.minTolerableReplication) {
- if (this.lowReplicationRollEnabled) {
- if (this.consecutiveLogRolls < this.lowReplicationRollLimit) {
- LOG.warn("HDFS pipeline error detected. " + "Found "
- + numCurrentReplicas + " replicas but expecting no less than "
- + this.minTolerableReplication + " replicas. "
- + " Requesting close of hlog.");
- requestLogRoll();
- // If rollWriter is requested, increase consecutiveLogRolls. Once it
- // is larger than lowReplicationRollLimit, disable the
- // LowReplication-Roller
- this.consecutiveLogRolls++;
- } else {
- LOG.warn("Too many consecutive RollWriter requests, it's a sign of "
- + "the total number of live datanodes is lower than the tolerable replicas.");
- this.consecutiveLogRolls = 0;
- this.lowReplicationRollEnabled = false;
- }
- }
- } else if (numCurrentReplicas >= this.minTolerableReplication) {
-
- if (!this.lowReplicationRollEnabled) {
- // The new writer's log replicas is always the default value.
- // So we should not enable LowReplication-Roller. If numEntries
- // is lower than or equals 1, we consider it as a new writer.
- if (this.numEntries.get() <= 1) {
- return;
- }
- // Once the live datanode number and the replicas return to normal,
- // enable the LowReplication-Roller.
- this.lowReplicationRollEnabled = true;
- LOG.info("LowReplication-Roller was enabled.");
- }
- }
- } catch (Exception e) {
- LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e +
- " still proceeding ahead...");
- }
- }
-
- /**
- * This method gets the datanode replication count for the current HLog.
- *
- * If the pipeline isn't started yet or is empty, you will get the default
- * replication factor. Therefore, if this function returns 0, it means you
- * are not properly running with the HDFS-826 patch.
- * @throws InvocationTargetException
- * @throws IllegalAccessException
- * @throws IllegalArgumentException
- *
- * @throws Exception
- */
- int getLogReplication()
- throws IllegalArgumentException, IllegalAccessException, InvocationTargetException {
- if (this.getNumCurrentReplicas != null && this.hdfs_out != null) {
- Object repl = this.getNumCurrentReplicas.invoke(getOutputStream(), NO_ARGS);
- if (repl instanceof Integer) {
- return ((Integer)repl).intValue();
- }
- }
- return 0;
- }
-
- boolean canGetCurReplicas() {
- return this.getNumCurrentReplicas != null;
- }
-
- public void hsync() throws IOException {
- syncer();
- }
-
- public void hflush() throws IOException {
- syncer();
- }
-
- public void sync() throws IOException {
- syncer();
- }
-
- public void sync(long txid) throws IOException {
- syncer(txid);
- }
-
- private void requestLogRoll() {
- if (!this.listeners.isEmpty()) {
- for (WALActionsListener i: this.listeners) {
- i.logRollRequested();
- }
- }
- }
-
- protected void doWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit,
- HTableDescriptor htd)
- throws IOException {
- if (!this.enabled) {
- return;
- }
- if (!this.listeners.isEmpty()) {
- for (WALActionsListener i: this.listeners) {
- i.visitLogEntryBeforeWrite(htd, logKey, logEdit);
- }
- }
- try {
- long now = System.currentTimeMillis();
- // coprocessor hook:
- if (!coprocessorHost.preWALWrite(info, logKey, logEdit)) {
- // write to our buffer for the Hlog file.
- logSyncerThread.append(new HLog.Entry(logKey, logEdit));
- }
- long took = System.currentTimeMillis() - now;
- coprocessorHost.postWALWrite(info, logKey, logEdit);
- writeTime.inc(took);
- long len = 0;
- for (KeyValue kv : logEdit.getKeyValues()) {
- len += kv.getLength();
- }
- writeSize.inc(len);
- if (took > 1000) {
- LOG.warn(String.format(
- "%s took %d ms appending an edit to hlog; editcount=%d, len~=%s",
- Thread.currentThread().getName(), took, this.numEntries.get(),
- StringUtils.humanReadableInt(len)));
- slowHLogAppendCount.incrementAndGet();
- slowHLogAppendTime.inc(took);
- }
- } catch (IOException e) {
- LOG.fatal("Could not append. Requesting close of hlog", e);
- requestLogRoll();
- throw e;
- }
- }
-
-
- /** @return How many items have been added to the log */
- int getNumEntries() {
- return numEntries.get();
- }
+ public void sync(long txid) throws IOException;
/**
* Obtain a log sequence number.
*/
- public long obtainSeqNum() {
- return this.logSeqNum.incrementAndGet();
- }
-
- /** @return the number of log files in use */
- int getNumLogFiles() {
- return outputfiles.size();
- }
-
- private byte[] getSnapshotName(byte[] encodedRegionName) {
- byte snp[] = new byte[encodedRegionName.length + 3];
- // an encoded region name has only hex digits. s, n or p are not hex
- // and therefore snapshot-names will never collide with
- // encoded-region-names
- snp[0] = 's'; snp[1] = 'n'; snp[2] = 'p';
- for (int i = 0; i < encodedRegionName.length; i++) {
- snp[i+3] = encodedRegionName[i];
- }
- return snp;
- }
+ public long obtainSeqNum();
/**
* By acquiring a log sequence ID, we can allow log messages to continue while
* we flush the cache.
- *
+ *
* Acquire a lock so that we do not roll the log between the start and
* completion of a cache-flush. Otherwise the log-seq-id for the flush will
* not appear in the correct logfile.
- *
+ *
* Ensuring that flushes and log-rolls don't happen concurrently also allows
* us to temporarily put a log-seq-number in lastSeqWritten against the region
* being flushed that might not be the earliest in-memory log-seq-number for
* that region. By the time the flush is completed or aborted and before the
* cacheFlushLock is released it is ensured that lastSeqWritten again has the
* oldest in-memory edit's lsn for the region that was being flushed.
- *
+ *
* In this method, by removing the entry in lastSeqWritten for the region
* being flushed we ensure that the next edit inserted in this region will be
- * correctly recorded in {@link #append(HRegionInfo, byte[], WALEdit, long, HTableDescriptor)} The
+ * correctly recorded in
+ * {@link #append(HRegionInfo, byte[], WALEdit, long, HTableDescriptor)} The
* lsn of the earliest in-memory lsn - which is now in the memstore snapshot -
* is saved temporarily in the lastSeqWritten map while the flush is active.
- *
+ *
* @return sequence ID to pass
* {@link #completeCacheFlush(byte[], byte[], long, boolean)} (byte[],
* byte[], long)}
* @see #completeCacheFlush(byte[], byte[], long, boolean)
* @see #abortCacheFlush(byte[])
*/
- public long startCacheFlush(final byte[] encodedRegionName) {
- this.cacheFlushLock.lock();
- Long seq = this.lastSeqWritten.remove(encodedRegionName);
- // seq is the lsn of the oldest edit associated with this region. If a
- // snapshot already exists - because the last flush failed - then seq will
- // be the lsn of the oldest edit in the snapshot
- if (seq != null) {
- // keeping the earliest sequence number of the snapshot in
- // lastSeqWritten maintains the correctness of
- // getOldestOutstandingSeqNum(). But it doesn't matter really because
- // everything is being done inside of cacheFlush lock.
- Long oldseq =
- lastSeqWritten.put(getSnapshotName(encodedRegionName), seq);
- if (oldseq != null) {
- LOG.error("Logic Error Snapshot seq id from earlier flush still" +
- " present! for region " + Bytes.toString(encodedRegionName) +
- " overwritten oldseq=" + oldseq + "with new seq=" + seq);
- Runtime.getRuntime().halt(1);
- }
- }
- return obtainSeqNum();
- }
-
+ public long startCacheFlush(final byte[] encodedRegionName);
/**
* Complete the cache flush
- *
+ *
* Protected by cacheFlushLock
- *
+ *
* @param encodedRegionName
* @param tableName
* @param logSeqId
* @throws IOException
*/
- public void completeCacheFlush(final byte [] encodedRegionName,
- final byte [] tableName, final long logSeqId, final boolean isMetaRegion)
- throws IOException {
- try {
- if (this.closed) {
- return;
- }
- long txid = 0;
- synchronized (updateLock) {
- long now = System.currentTimeMillis();
- WALEdit edit = completeCacheFlushLogEdit();
- HLogKey key = makeKey(encodedRegionName, tableName, logSeqId,
- System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID);
- logSyncerThread.append(new Entry(key, edit));
- txid = this.unflushedEntries.incrementAndGet();
- writeTime.inc(System.currentTimeMillis() - now);
- long len = 0;
- for (KeyValue kv : edit.getKeyValues()) {
- len += kv.getLength();
- }
- writeSize.inc(len);
- this.numEntries.incrementAndGet();
- }
- // sync txn to file system
- this.sync(txid);
-
- } finally {
- // updateLock not needed for removing snapshot's entry
- // Cleaning up of lastSeqWritten is in the finally clause because we
- // don't want to confuse getOldestOutstandingSeqNum()
- this.lastSeqWritten.remove(getSnapshotName(encodedRegionName));
- this.cacheFlushLock.unlock();
- }
- }
-
- private WALEdit completeCacheFlushLogEdit() {
- KeyValue kv = new KeyValue(METAROW, METAFAMILY, null,
- System.currentTimeMillis(), COMPLETE_CACHE_FLUSH);
- WALEdit e = new WALEdit();
- e.add(kv);
- return e;
- }
+ public void completeCacheFlush(final byte[] encodedRegionName,
+ final byte[] tableName, final long logSeqId, final boolean isMetaRegion)
+ throws IOException;
/**
- * Abort a cache flush.
- * Call if the flush fails. Note that the only recovery for an aborted flush
- * currently is a restart of the regionserver so the snapshot content dropped
- * by the failure gets restored to the memstore.
- */
- public void abortCacheFlush(byte[] encodedRegionName) {
- Long snapshot_seq =
- this.lastSeqWritten.remove(getSnapshotName(encodedRegionName));
- if (snapshot_seq != null) {
- // updateLock not necessary because we are racing against
- // lastSeqWritten.putIfAbsent() in append() and we will always win
- // before releasing cacheFlushLock make sure that the region's entry in
- // lastSeqWritten points to the earliest edit in the region
- Long current_memstore_earliest_seq =
- this.lastSeqWritten.put(encodedRegionName, snapshot_seq);
- if (current_memstore_earliest_seq != null &&
- (current_memstore_earliest_seq.longValue() <=
- snapshot_seq.longValue())) {
- LOG.error("Logic Error region " + Bytes.toString(encodedRegionName) +
- "acquired edits out of order current memstore seq=" +
- current_memstore_earliest_seq + " snapshot seq=" + snapshot_seq);
- Runtime.getRuntime().halt(1);
- }
- }
- this.cacheFlushLock.unlock();
- }
+ * Abort a cache flush. Call if the flush fails. Note that the only recovery
+ * for an aborted flush currently is a restart of the regionserver so the
+ * snapshot content dropped by the failure gets restored to the memstore.
+ */
+ public void abortCacheFlush(byte[] encodedRegionName);
/**
- * @param family
- * @return true if the column is a meta column
+ * @return Coprocessor host.
*/
- public static boolean isMetaFamily(byte [] family) {
- return Bytes.equals(METAFAMILY, family);
- }
+ public WALCoprocessorHost getCoprocessorHost();
/**
* Get LowReplication-Roller status
*
* @return lowReplicationRollEnabled
*/
- public boolean isLowReplicationRollEnabled() {
- return lowReplicationRollEnabled;
- }
-
- @SuppressWarnings("unchecked")
- public static Class<? extends HLogKey> getKeyClass(Configuration conf) {
- return (Class<? extends HLogKey>)
- conf.getClass("hbase.regionserver.hlog.keyclass", HLogKey.class);
- }
-
- public static HLogKey newKey(Configuration conf) throws IOException {
- Class<? extends HLogKey> keyClass = getKeyClass(conf);
- try {
- return keyClass.newInstance();
- } catch (InstantiationException e) {
- throw new IOException("cannot create hlog key");
- } catch (IllegalAccessException e) {
- throw new IOException("cannot create hlog key");
- }
- }
-
- /**
- * Utility class that lets us keep track of the edit with it's key
- * Only used when splitting logs
- */
- public static class Entry implements Writable {
- private WALEdit edit;
- private HLogKey key;
-
- public Entry() {
- edit = new WALEdit();
- key = new HLogKey();
- }
-
- /**
- * Constructor for both params
- * @param edit log's edit
- * @param key log's key
- */
- public Entry(HLogKey key, WALEdit edit) {
- super();
- this.key = key;
- this.edit = edit;
- }
- /**
- * Gets the edit
- * @return edit
- */
- public WALEdit getEdit() {
- return edit;
- }
- /**
- * Gets the key
- * @return key
- */
- public HLogKey getKey() {
- return key;
- }
-
- /**
- * Set compression context for this entry.
- * @param compressionContext Compression context
- */
- public void setCompressionContext(CompressionContext compressionContext) {
- edit.setCompressionContext(compressionContext);
- key.setCompressionContext(compressionContext);
- }
-
- @Override
- public String toString() {
- return this.key + "=" + this.edit;
- }
-
- @Override
- public void write(DataOutput dataOutput) throws IOException {
- this.key.write(dataOutput);
- this.edit.write(dataOutput);
- }
-
- @Override
- public void readFields(DataInput dataInput) throws IOException {
- this.key.readFields(dataInput);
- this.edit.readFields(dataInput);
- }
- }
-
- /**
- * Construct the HLog directory name
- *
- * @param serverName Server name formatted as described in {@link ServerName}
- * @return the relative HLog directory name, e.g. <code>.logs/1.example.org,60030,12345</code>
- * if <code>serverName</code> passed is <code>1.example.org,60030,12345</code>
- */
- public static String getHLogDirectoryName(final String serverName) {
- StringBuilder dirName = new StringBuilder(HConstants.HREGION_LOGDIR_NAME);
- dirName.append("/");
- dirName.append(serverName);
- return dirName.toString();
- }
-
-
- /**
- * @param path - the path to analyze. Expected format, if it's in hlog directory:
- * / [base directory for hbase] / hbase / .logs / ServerName / logfile
- * @return null if it's not a log file. Returns the ServerName of the region server that created
- * this log file otherwise.
- */
- public static ServerName getServerNameFromHLogDirectoryName(Configuration conf, String path)
- throws IOException {
- if (path == null || path.length() <= HConstants.HREGION_LOGDIR_NAME.length()) {
- return null;
- }
-
- if (conf == null) {
- throw new IllegalArgumentException("parameter conf must be set");
- }
-
- final String rootDir = conf.get(HConstants.HBASE_DIR);
- if (rootDir == null || rootDir.isEmpty()) {
- throw new IllegalArgumentException(HConstants.HBASE_DIR + " key not found in conf.");
- }
-
- final StringBuilder startPathSB = new StringBuilder(rootDir);
- if (!rootDir.endsWith("/")) startPathSB.append('/');
- startPathSB.append(HConstants.HREGION_LOGDIR_NAME);
- if (!HConstants.HREGION_LOGDIR_NAME.endsWith("/")) startPathSB.append('/');
- final String startPath = startPathSB.toString();
-
- String fullPath;
- try {
- fullPath = FileSystem.get(conf).makeQualified(new Path(path)).toString();
- } catch (IllegalArgumentException e) {
- LOG.info("Call to makeQualified failed on " + path + " " + e.getMessage());
- return null;
- }
-
- if (!fullPath.startsWith(startPath)) {
- return null;
- }
-
- final String serverNameAndFile = fullPath.substring(startPath.length());
-
- if (serverNameAndFile.indexOf('/') < "a,0,0".length()) {
- // Either it's a file (not a directory) or it's not a ServerName format
- return null;
- }
-
- final String serverName = serverNameAndFile.substring(0, serverNameAndFile.indexOf('/') - 1);
-
- if (!ServerName.isFullServerName(serverName)) {
- return null;
- }
-
- return ServerName.parseServerName(serverName);
- }
-
- /**
- * Get the directory we are making logs in.
- *
- * @return dir
- */
- protected Path getDir() {
- return dir;
- }
-
- /**
- * @param filename name of the file to validate
- * @return <tt>true</tt> if the filename matches an HLog, <tt>false</tt>
- * otherwise
- */
- public static boolean validateHLogFilename(String filename) {
- return pattern.matcher(filename).matches();
- }
-
- static Path getHLogArchivePath(Path oldLogDir, Path p) {
- return new Path(oldLogDir, p.getName());
- }
-
- static String formatRecoveredEditsFileName(final long seqid) {
- return String.format("%019d", seqid);
- }
-
- /**
- * Returns sorted set of edit files made by wal-log splitter, excluding files
- * with '.temp' suffix.
- * @param fs
- * @param regiondir
- * @return Files in passed <code>regiondir</code> as a sorted set.
- * @throws IOException
- */
- public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem fs,
- final Path regiondir)
- throws IOException {
- NavigableSet<Path> filesSorted = new TreeSet<Path>();
- Path editsdir = getRegionDirRecoveredEditsDir(regiondir);
- if (!fs.exists(editsdir)) return filesSorted;
- FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() {
- @Override
- public boolean accept(Path p) {
- boolean result = false;
- try {
- // Return files and only files that match the editfile names pattern.
- // There can be other files in this directory other than edit files.
- // In particular, on error, we'll move aside the bad edit file giving
- // it a timestamp suffix. See moveAsideBadEditsFile.
- Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName());
- result = fs.isFile(p) && m.matches();
- // Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX,
- // because it means splithlog thread is writting this file.
- if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) {
- result = false;
- }
- } catch (IOException e) {
- LOG.warn("Failed isFile check on " + p);
- }
- return result;
- }
- });
- if (files == null) return filesSorted;
- for (FileStatus status: files) {
- filesSorted.add(status.getPath());
- }
- return filesSorted;
- }
-
- /**
- * Move aside a bad edits file.
- * @param fs
- * @param edits Edits file to move aside.
- * @return The name of the moved aside file.
- * @throws IOException
- */
- public static Path moveAsideBadEditsFile(final FileSystem fs,
- final Path edits)
- throws IOException {
- Path moveAsideName = new Path(edits.getParent(), edits.getName() + "." +
- System.currentTimeMillis());
- if (!fs.rename(edits, moveAsideName)) {
- LOG.warn("Rename failed from " + edits + " to " + moveAsideName);
- }
- return moveAsideName;
- }
-
- /**
- * @param regiondir This regions directory in the filesystem.
- * @return The directory that holds recovered edits files for the region
- * <code>regiondir</code>
- */
- public static Path getRegionDirRecoveredEditsDir(final Path regiondir) {
- return new Path(regiondir, RECOVERED_EDITS_DIR);
- }
-
- public static final long FIXED_OVERHEAD = ClassSize.align(
- ClassSize.OBJECT + (5 * ClassSize.REFERENCE) +
- ClassSize.ATOMIC_INTEGER + Bytes.SIZEOF_INT + (3 * Bytes.SIZEOF_LONG));
-
- private static void usage() {
- System.err.println("Usage: HLog <ARGS>");
- System.err.println("Arguments:");
- System.err.println(" --dump Dump textual representation of passed one or more files");
- System.err.println(" For example: HLog --dump hdfs://example.com:9000/hbase/.logs/MACHINE/LOGFILE");
- System.err.println(" --split Split the passed directory of WAL logs");
- System.err.println(" For example: HLog --split hdfs://example.com:9000/hbase/.logs/DIR");
- }
-
- private static void split(final Configuration conf, final Path p)
- throws IOException {
- FileSystem fs = FileSystem.get(conf);
- if (!fs.exists(p)) {
- throw new FileNotFoundException(p.toString());
- }
- final Path baseDir = new Path(conf.get(HConstants.HBASE_DIR));
- final Path oldLogDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
- if (!fs.getFileStatus(p).isDir()) {
- throw new IOException(p + " is not a directory");
- }
-
- HLogSplitter logSplitter = HLogSplitter.createLogSplitter(
- conf, baseDir, p, oldLogDir, fs);
- logSplitter.splitLog();
- }
-
- /**
- * @return Coprocessor host.
- */
- public WALCoprocessorHost getCoprocessorHost() {
- return coprocessorHost;
- }
-
- /** Provide access to currently deferred sequence num for tests */
- boolean hasDeferredEntries() {
- return lastDeferredTxid > syncedTillHere;
- }
-
- /**
- * Pass one or more log file names and it will either dump out a text version
- * on <code>stdout</code> or split the specified log files.
- *
- * @param args
- * @throws IOException
- */
- public static void main(String[] args) throws IOException {
- if (args.length < 2) {
- usage();
- System.exit(-1);
- }
- // either dump using the HLogPrettyPrinter or split, depending on args
- if (args[0].compareTo("--dump") == 0) {
- HLogPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length));
- } else if (args[0].compareTo("--split") == 0) {
- Configuration conf = HBaseConfiguration.create();
- for (int i = 1; i < args.length; i++) {
- try {
- conf.set("fs.default.name", args[i]);
- conf.set("fs.defaultFS", args[i]);
- Path logPath = new Path(args[i]);
- split(conf, logPath);
- } catch (Throwable t) {
- t.printStackTrace(System.err);
- System.exit(-1);
- }
- }
- } else {
- usage();
- System.exit(-1);
- }
- }
+ public boolean isLowReplicationRollEnabled();
}
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java?rev=1393126&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java Tue Oct 2 19:29:19 2012
@@ -0,0 +1,119 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer;
+
+public class HLogFactory {
+ private static final Log LOG = LogFactory.getLog(HLogFactory.class);
+
+ public static HLog createHLog(final FileSystem fs, final Path root, final String logName,
+ final Configuration conf) throws IOException {
+ return new FSHLog(fs, root, logName, conf);
+ }
+
+ public static HLog createHLog(final FileSystem fs, final Path root, final String logName,
+ final String oldLogName, final Configuration conf) throws IOException {
+ return new FSHLog(fs, root, logName, oldLogName, conf);
+}
+
+ public static HLog createHLog(final FileSystem fs, final Path root, final String logName,
+ final Configuration conf, final List<WALActionsListener> listeners,
+ final String prefix) throws IOException {
+ return new FSHLog(fs, root, logName, conf, listeners, prefix);
+ }
+
+ /*
+ * WAL Reader
+ */
+
+ private static Class<? extends Reader> logReaderClass;
+
+ static void resetLogReaderClass() {
+ logReaderClass = null;
+ }
+
+ /**
+ * Create a reader for the WAL.
+ * @return A WAL reader. Close when done with it.
+ * @throws IOException
+ */
+ public static HLog.Reader createReader(final FileSystem fs,
+ final Path path, Configuration conf)
+ throws IOException {
+ try {
+
+ if (logReaderClass == null) {
+
+ logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
+ SequenceFileLogReader.class, Reader.class);
+ }
+
+
+ HLog.Reader reader = logReaderClass.newInstance();
+ reader.init(fs, path, conf);
+ return reader;
+ } catch (IOException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ throw new IOException("Cannot get log reader", e);
+ }
+ }
+
+ /*
+ * WAL writer
+ */
+
+ private static Class<? extends Writer> logWriterClass;
+
+ /**
+ * Create a writer for the WAL.
+ * @return A WAL writer. Close when done with it.
+ * @throws IOException
+ */
+ public static HLog.Writer createWriter(final FileSystem fs,
+ final Path path, Configuration conf)
+ throws IOException {
+ try {
+ if (logWriterClass == null) {
+ logWriterClass = conf.getClass("hbase.regionserver.hlog.writer.impl",
+ SequenceFileLogWriter.class, Writer.class);
+ }
+ HLog.Writer writer = (HLog.Writer) logWriterClass.newInstance();
+ writer.init(fs, path, conf);
+ return writer;
+ } catch (Exception e) {
+ throw new IOException("cannot get log writer", e);
+ }
+ }
+
+}
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogMetrics.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogMetrics.java?rev=1393126&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogMetrics.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogMetrics.java Tue Oct 2 19:29:19 2012
@@ -0,0 +1,83 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+
+public class HLogMetrics {
+
+ public static class Metric {
+ public long min = Long.MAX_VALUE;
+ public long max = 0;
+ public long total = 0;
+ public int count = 0;
+
+ synchronized void inc(final long val) {
+ min = Math.min(min, val);
+ max = Math.max(max, val);
+ total += val;
+ ++count;
+ }
+
+ synchronized Metric get() {
+ Metric copy = new Metric();
+ copy.min = min;
+ copy.max = max;
+ copy.total = total;
+ copy.count = count;
+ this.min = Long.MAX_VALUE;
+ this.max = 0;
+ this.total = 0;
+ this.count = 0;
+ return copy;
+ }
+ }
+
+
+ // For measuring latency of writes
+ static Metric writeTime = new Metric();
+ static Metric writeSize = new Metric();
+ // For measuring latency of syncs
+ static Metric syncTime = new Metric();
+ //For measuring slow HLog appends
+ static AtomicLong slowHLogAppendCount = new AtomicLong();
+ static Metric slowHLogAppendTime = new Metric();
+
+ public static Metric getWriteTime() {
+ return writeTime.get();
+ }
+
+ public static Metric getWriteSize() {
+ return writeSize.get();
+ }
+
+ public static Metric getSyncTime() {
+ return syncTime.get();
+ }
+
+ public static long getSlowAppendCount() {
+ return slowHLogAppendCount.get();
+ }
+
+ public static Metric getSlowAppendTime() {
+ return slowHLogAppendTime.get();
+ }
+}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogPrettyPrinter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogPrettyPrinter.java?rev=1393126&r1=1393125&r2=1393126&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogPrettyPrinter.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogPrettyPrinter.java Tue Oct 2 19:29:19 2012
@@ -240,9 +240,9 @@ public class HLogPrettyPrinter {
out.print("[");
firstTxn = true;
}
- Reader log = HLog.getReader(fs, p, conf);
+ Reader log = HLogFactory.createReader(fs, p, conf);
try {
- HLog.Entry entry;
+ FSHLog.Entry entry;
while ((entry = log.next()) != null) {
HLogKey key = entry.getKey();
WALEdit edit = entry.getEdit();
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1393126&r1=1393125&r2=1393126&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java Tue Oct 2 19:29:19 2012
@@ -563,7 +563,7 @@ public class HLogSplitter {
}
for (Path p : processedLogs) {
- Path newPath = HLog.getHLogArchivePath(oldLogDir, p);
+ Path newPath = FSHLog.getHLogArchivePath(oldLogDir, p);
if (fs.exists(p)) {
if (!fs.rename(p, newPath)) {
LOG.warn("Unable to move " + p + " to " + newPath);
@@ -598,7 +598,7 @@ public class HLogSplitter {
Path tableDir = HTableDescriptor.getTableDir(rootDir, logEntry.getKey().getTablename());
Path regiondir = HRegion.getRegionDir(tableDir,
Bytes.toString(logEntry.getKey().getEncodedRegionName()));
- Path dir = HLog.getRegionDirRecoveredEditsDir(regiondir);
+ Path dir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
if (!fs.exists(regiondir)) {
LOG.info("This region's directory doesn't exist: "
@@ -777,7 +777,7 @@ public class HLogSplitter {
*/
protected Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
throws IOException {
- return HLog.createWriter(fs, logfile, conf);
+ return HLogFactory.createWriter(fs, logfile, conf);
}
/**
@@ -785,7 +785,7 @@ public class HLogSplitter {
*/
protected Reader getReader(FileSystem fs, Path curLogFile, Configuration conf)
throws IOException {
- return HLog.getReader(fs, curLogFile, conf);
+ return HLogFactory.createReader(fs, curLogFile, conf);
}
/**