You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@hbase.apache.org by "Jim Kellerman (POWERSET)" <Ji...@microsoft.com> on 2009/03/06 00:57:42 UTC
FW: svn commit: r750688 - in /hadoop/hbase/branches/0.19: lib/
src/java/org/apache/hadoop/hbase/io/
src/java/org/apache/hadoop/hbase/regionserver/
Undoing commit to wrong branch. Sorry bout that. Fixed now.
---
Jim Kellerman, Powerset (Live Search, Microsoft Corporation)
> -----Original Message-----
> From: jimk@apache.org [mailto:jimk@apache.org]
> Sent: Thursday, March 05, 2009 3:50 PM
> To: hbase-commits@hadoop.apache.org
> Subject: svn commit: r750688 - in /hadoop/hbase/branches/0.19: lib/
> src/java/org/apache/hadoop/hbase/io/
> src/java/org/apache/hadoop/hbase/regionserver/
>
> Author: jimk
> Date: Thu Mar 5 23:49:56 2009
> New Revision: 750688
>
> URL: http://svn.apache.org/viewvc?rev=750688&view=rev
> Log:
> Undo previous commit. Was committed to wrong branch!
>
> Added:
> hadoop/hbase/branches/0.19/lib/hadoop-0.19.1-core.jar (with props)
> hadoop/hbase/branches/0.19/lib/hadoop-0.19.1-test.jar (with props)
> Removed:
> hadoop/hbase/branches/0.19/lib/hadoop-0.19.1-dev-core.jar
> hadoop/hbase/branches/0.19/lib/hadoop-0.19.1-dev-test.jar
> Modified:
>
> hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/io/SequenceFil
> e.java
>
> hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/H
> Log.java
>
> Added: hadoop/hbase/branches/0.19/lib/hadoop-0.19.1-core.jar
> URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/lib/hadoop-
> 0.19.1-core.jar?rev=750688&view=auto
> ==========================================================================
> ====
> Binary file - no diff available.
>
> Propchange: hadoop/hbase/branches/0.19/lib/hadoop-0.19.1-core.jar
> --------------------------------------------------------------------------
> ----
> svn:mime-type = application/octet-stream
>
> Added: hadoop/hbase/branches/0.19/lib/hadoop-0.19.1-test.jar
> URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/lib/hadoop-
> 0.19.1-test.jar?rev=750688&view=auto
> ==========================================================================
> ====
> Binary file - no diff available.
>
> Propchange: hadoop/hbase/branches/0.19/lib/hadoop-0.19.1-test.jar
> --------------------------------------------------------------------------
> ----
> svn:mime-type = application/octet-stream
>
> Modified:
> hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/io/SequenceFil
> e.java
> URL:
> http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/src/java/org/apach
> e/hadoop/hbase/io/SequenceFile.java?rev=750688&r1=750687&r2=750688&view=di
> ff
> ==========================================================================
> ====
> ---
> hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/io/SequenceFil
> e.java (original)
> +++
> hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/io/SequenceFil
> e.java Thu Mar 5 23:49:56 2009
> @@ -770,13 +770,11 @@
> return true;
> }
>
> - @Override
> public int hashCode() {
> assert false : "hashCode not designed";
> return 42; // any arbitrary constant will do
> }
>
> - @Override
> public String toString() {
> StringBuffer sb = new StringBuffer();
> sb.append("size: ").append(this.theMetadata.size()).append("\n");
> @@ -954,16 +952,6 @@
> }
> }
>
> - /**
> - * flush all currently written data to the file system
> - * @throws IOException
> - */
> - public void syncFs() throws IOException {
> - if (out != null) {
> - out.sync(); // flush contents to
> file system
> - }
> - }
> -
> /** Returns the configuration of this file. */
> Configuration getConf() { return conf; }
>
> @@ -1127,13 +1115,10 @@
>
> }
>
> - @Override
> boolean isCompressed() { return true; }
> - @Override
> boolean isBlockCompressed() { return false; }
>
> /** Append a key/value pair. */
> - @Override
> @SuppressWarnings("unchecked")
> public synchronized void append(Object key, Object val)
> throws IOException {
> @@ -1166,7 +1151,6 @@
> }
>
> /** Append a key/value pair. */
> - @Override
> public synchronized void appendRaw(byte[] keyData, int keyOffset,
> int keyLength, ValueBytes val) throws IOException {
>
> @@ -1256,9 +1240,7 @@
> finalizeFileHeader();
> }
>
> - @Override
> boolean isCompressed() { return true; }
> - @Override
> boolean isBlockCompressed() { return true; }
>
> /** Initialize */
> @@ -1286,7 +1268,6 @@
> }
>
> /** Compress and flush contents to dfs */
> - @Override
> public synchronized void sync() throws IOException {
> if (noBufferedRecords > 0) {
> super.sync();
> @@ -1316,7 +1297,6 @@
> }
>
> /** Close the file. */
> - @Override
> public synchronized void close() throws IOException {
> if (out != null) {
> sync();
> @@ -1325,7 +1305,6 @@
> }
>
> /** Append a key/value pair. */
> - @Override
> @SuppressWarnings("unchecked")
> public synchronized void append(Object key, Object val)
> throws IOException {
> @@ -1358,7 +1337,6 @@
> }
>
> /** Append a key/value pair. */
> - @Override
> public synchronized void appendRaw(byte[] keyData, int keyOffset,
> int keyLength, ValueBytes val) throws IOException {
>
> @@ -1951,7 +1929,6 @@
> * of the value may be computed by calling buffer.getLength() before
> and
> * after calls to this method. */
> /** @deprecated Call {@link
> #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */
> - @Deprecated
> public synchronized int next(DataOutputBuffer buffer) throws
> IOException {
> // Unsupported for block-compressed sequence files
> if (blockCompressed) {
> @@ -2233,7 +2210,6 @@
> }
>
> /** Returns the name of the file. */
> - @Override
> public String toString() {
> return file.toString();
> }
> @@ -2822,7 +2798,6 @@
> this.tmpDir = tmpDir;
> this.progress = progress;
> }
> - @Override
> protected boolean lessThan(Object a, Object b) {
> // indicate we're making progress
> if (progress != null) {
> @@ -2958,7 +2933,7 @@
> totalBytes += segmentsToMerge.get(i).segmentLength;
> }
> if (totalBytes != 0) //being paranoid
> - progPerByte = 1.0f / totalBytes;
> + progPerByte = 1.0f / (float)totalBytes;
> //reset factor to what it originally was
> factor = origFactor;
> return this;
> @@ -3080,7 +3055,6 @@
> compareTo(that.segmentPathName.toString());
> }
>
> - @Override
> public boolean equals(Object o) {
> if (!(o instanceof SegmentDescriptor)) {
> return false;
> @@ -3095,7 +3069,6 @@
> return false;
> }
>
> - @Override
> public int hashCode() {
> return 37 * 17 + (int) (segmentOffset^(segmentOffset>>>32));
> }
> @@ -3186,7 +3159,6 @@
> /** The default cleanup. Subclasses can override this with a custom
> * cleanup
> */
> - @Override
> public void cleanup() throws IOException {
> super.close();
> if (super.shouldPreserveInput()) return;
>
> Modified:
> hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/H
> Log.java
> URL:
> http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/src/java/org/apach
> e/hadoop/hbase/regionserver/HLog.java?rev=750688&r1=750687&r2=750688&view=
> diff
> ==========================================================================
> ====
> ---
> hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/H
> Log.java (original)
> +++
> hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/H
> Log.java Thu Mar 5 23:49:56 2009
> @@ -1,962 +1,919 @@
> -/**
> - * Copyright 2007 The Apache Software Foundation
> - *
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements. See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership. The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License. You may obtain a copy of the License at
> - *
> - * http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> - */
> -package org.apache.hadoop.hbase.regionserver;
> -
> -import java.io.EOFException;
> -import java.io.FileNotFoundException;
> -import java.io.IOException;
> -import java.io.UnsupportedEncodingException;
> -import java.net.URLEncoder;
> -import java.text.SimpleDateFormat;
> -import java.util.Date;
> -import java.util.Collections;
> -import java.util.Map;
> -import java.util.SortedMap;
> -import java.util.TreeMap;
> -import java.util.TreeSet;
> -import java.util.concurrent.locks.Lock;
> -import java.util.concurrent.locks.ReentrantLock;
> -
> -import org.apache.commons.logging.Log;
> -import org.apache.commons.logging.LogFactory;
> -import org.apache.hadoop.conf.Configuration;
> -import org.apache.hadoop.fs.FSDataOutputStream;
> -import org.apache.hadoop.fs.FileStatus;
> -import org.apache.hadoop.fs.FileSystem;
> -import org.apache.hadoop.fs.Path;
> -import org.apache.hadoop.fs.Syncable;
> -import org.apache.hadoop.hbase.HBaseConfiguration;
> -import org.apache.hadoop.hbase.HConstants;
> -import org.apache.hadoop.hbase.HRegionInfo;
> -import org.apache.hadoop.hbase.HServerInfo;
> -import org.apache.hadoop.hbase.HStoreKey;
> -import org.apache.hadoop.hbase.HTableDescriptor;
> -import org.apache.hadoop.hbase.RemoteExceptionHandler;
> -import org.apache.hadoop.hbase.util.Bytes;
> -import org.apache.hadoop.hbase.util.FSUtils;
> +/**
> + * Copyright 2007 The Apache Software Foundation
> + *
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements. See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership. The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.hadoop.hbase.regionserver;
> +
> +import java.io.EOFException;
> +import java.io.FileNotFoundException;
> +import java.io.IOException;
> +import java.io.UnsupportedEncodingException;
> +import java.net.URLEncoder;
> +import java.util.Collections;
> +import java.util.Map;
> +import java.util.SortedMap;
> +import java.util.TreeMap;
> +import java.util.TreeSet;
> +import java.util.concurrent.locks.Lock;
> +import java.util.concurrent.locks.ReentrantLock;
> +
> +import org.apache.commons.logging.Log;
> +import org.apache.commons.logging.LogFactory;
> +import org.apache.hadoop.conf.Configuration;
> +import org.apache.hadoop.fs.FileStatus;
> +import org.apache.hadoop.fs.FileSystem;
> +import org.apache.hadoop.fs.Path;
> +import org.apache.hadoop.fs.Syncable;
> +import org.apache.hadoop.hbase.HBaseConfiguration;
> +import org.apache.hadoop.hbase.HConstants;
> +import org.apache.hadoop.hbase.HRegionInfo;
> +import org.apache.hadoop.hbase.HServerInfo;
> +import org.apache.hadoop.hbase.HStoreKey;
> +import org.apache.hadoop.hbase.HTableDescriptor;
> +import org.apache.hadoop.hbase.RemoteExceptionHandler;
> +import org.apache.hadoop.hbase.util.Bytes;
> +import org.apache.hadoop.hbase.util.FSUtils;
> import org.apache.hadoop.hbase.io.SequenceFile;
> import org.apache.hadoop.hbase.io.SequenceFile.CompressionType;
> import org.apache.hadoop.hbase.io.SequenceFile.Metadata;
> import org.apache.hadoop.hbase.io.SequenceFile.Reader;
> -import org.apache.hadoop.io.compress.DefaultCodec;
> -
> -/**
> - * HLog stores all the edits to the HStore.
> - *
> - * It performs logfile-rolling, so external callers are not aware that
> the
> - * underlying file is being rolled.
> - *
> - * <p>
> - * A single HLog is used by several HRegions simultaneously.
> - *
> - * <p>
> - * Each HRegion is identified by a unique long <code>int</code>. HRegions
> do
> - * not need to declare themselves before using the HLog; they simply
> include
> - * their HRegion-id in the <code>append</code> or
> - * <code>completeCacheFlush</code> calls.
> - *
> - * <p>
> - * An HLog consists of multiple on-disk files, which have a chronological
> order.
> - * As data is flushed to other (better) on-disk structures, the log
> becomes
> - * obsolete. We can destroy all the log messages for a given HRegion-id
> up to
> - * the most-recent CACHEFLUSH message from that HRegion.
> - *
> - * <p>
> - * It's only practical to delete entire files. Thus, we delete an entire
> on-disk
> - * file F when all of the messages in F have a log-sequence-id that's
> older
> - * (smaller) than the most-recent CACHEFLUSH message for every HRegion
> that has
> - * a message in F.
> - *
> - * <p>
> - * Synchronized methods can never execute in parallel. However, between
> the
> - * start of a cache flush and the completion point, appends are allowed
> but log
> - * rolling is not. To prevent log rolling taking place during this
> period, a
> - * separate reentrant lock is used.
> - *
> - */
> -public class HLog implements HConstants, Syncable {
> - private static final Log LOG = LogFactory.getLog(HLog.class);
> - private static final String HLOG_DATFILE = "hlog.dat.";
> - private static final SimpleDateFormat DATE_FORMAT =
> - new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
> - static final byte [] METACOLUMN = Bytes.toBytes("METACOLUMN:");
> - static final byte [] METAROW = Bytes.toBytes("METAROW");
> - final FileSystem fs;
> - final Path dir;
> - final Configuration conf;
> - final LogRollListener listener;
> - private final int maxlogentries;
> - private final long optionalFlushInterval;
> - private final long blocksize;
> - private final int flushlogentries;
> - private volatile int unflushedEntries = 0;
> - private volatile long lastLogFlushTime;
> - final long threadWakeFrequency;
> -
> - /*
> - * Current log file.
> - */
> - SequenceFile.Writer writer;
> -
> - /*
> - * Map of all log files but the current one.
> - */
> - final SortedMap<Long, Path> outputfiles =
> - Collections.synchronizedSortedMap(new TreeMap<Long, Path>());
> -
> - /*
> - * Map of region to last sequence/edit id.
> - */
> - private final Map<byte [], Long> lastSeqWritten = Collections.
> - synchronizedSortedMap(new TreeMap<byte [],
> Long>(Bytes.BYTES_COMPARATOR));
> -
> - private volatile boolean closed = false;
> -
> - private final Integer sequenceLock = new Integer(0);
> - private volatile long logSeqNum = 0;
> -
> - private volatile long filenum = 0;
> - private volatile long old_filenum = -1;
> -
> - private volatile int numEntries = 0;
> -
> - // This lock prevents starting a log roll during a cache flush.
> - // synchronized is insufficient because a cache flush spans two method
> calls.
> - private final Lock cacheFlushLock = new ReentrantLock();
> -
> - // We synchronize on updateLock to prevent updates and to prevent a log
> roll
> - // during an update
> - private final Integer updateLock = new Integer(0);
> -
> - /*
> - * If more than this many logs, force flush of oldest region to oldest
> edit
> - * goes to disk. If too many and we crash, then will take forever
> replaying.
> - * Keep the number of logs tidy.
> - */
> - private final int maxLogs;
> -
> - /**
> - * Create an edit log at the given <code>dir</code> location.
> - *
> - * You should never have to load an existing log. If there is a log at
> - * startup, it should have already been processed and deleted by the
> time the
> - * HLog object is started up.
> - *
> - * @param fs
> - * @param dir
> - * @param conf
> - * @param listener
> - * @throws IOException
> - */
> - public HLog(final FileSystem fs, final Path dir, final Configuration
> conf,
> - final LogRollListener listener)
> - throws IOException {
> - super();
> - this.fs = fs;
> - this.dir = dir;
> - this.conf = conf;
> - this.listener = listener;
> - this.maxlogentries =
> - conf.getInt("hbase.regionserver.maxlogentries", 100000);
> - this.flushlogentries =
> - conf.getInt("hbase.regionserver.flushlogentries", 100);
> - this.blocksize =
> - conf.getLong("hbase.regionserver.hlog.blocksize", 64L * 1024L *
> 1024L);
> - this.optionalFlushInterval =
> - conf.getLong("hbase.regionserver.optionallogflushinterval", 10 *
> 1000);
> - this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 *
> 1000);
> - this.lastLogFlushTime = System.currentTimeMillis();
> - if (fs.exists(dir)) {
> - throw new IOException("Target HLog directory already exists: " +
> dir);
> - }
> - fs.mkdirs(dir);
> - this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 64);
> - rollWriter();
> - }
> -
> - /**
> - * Accessor for tests. Not a part of the public API.
> - * @return Current state of the monotonically increasing file id.
> - */
> - public long getFilenum() {
> - return this.filenum;
> - }
> -
> - /**
> - * Get the compression type for the hlog files.
> - * @param c Configuration to use.
> - * @return the kind of compression to use
> - */
> - private static CompressionType getCompressionType(final Configuration
> c) {
> - String name = c.get("hbase.io.seqfile.compression.type");
> - return name == null? CompressionType.NONE:
> CompressionType.valueOf(name);
> - }
> -
> - /**
> - * Called by HRegionServer when it opens a new region to ensure that
> log
> - * sequence numbers are always greater than the latest sequence number
> of the
> - * region being brought on-line.
> - *
> - * @param newvalue We'll set log edit/sequence number to this value if
> it
> - * is greater than the current value.
> - */
> - void setSequenceNumber(long newvalue) {
> - synchronized (sequenceLock) {
> - if (newvalue > logSeqNum) {
> - if (LOG.isDebugEnabled()) {
> - LOG.debug("changing sequence number from " + logSeqNum + " to "
> +
> - newvalue);
> - }
> - logSeqNum = newvalue;
> - }
> - }
> - }
> -
> - /**
> - * @return log sequence number
> - */
> - public long getSequenceNumber() {
> - return logSeqNum;
> - }
> -
> - /**
> - * Roll the log writer. That is, start writing log messages to a new
> file.
> - *
> - * Because a log cannot be rolled during a cache flush, and a cache
> flush
> - * spans two method calls, a special lock needs to be obtained so that
> a cache
> - * flush cannot start when the log is being rolled and the log cannot
> be
> - * rolled during a cache flush.
> - *
> - * <p>Note that this method cannot be synchronized because it is
> possible that
> - * startCacheFlush runs, obtaining the cacheFlushLock, then this method
> could
> - * start which would obtain the lock on this but block on obtaining the
> - * cacheFlushLock and then completeCacheFlush could be called which
> would wait
> - * for the lock on this and consequently never release the
> cacheFlushLock
> - *
> - * @return If lots of logs, flush the returned region so next time
> through
> - * we can clean logs. Returns null if nothing to flush.
> - * @throws FailedLogCloseException
> - * @throws IOException
> - */
> - public byte [] rollWriter() throws FailedLogCloseException, IOException
> {
> - byte [] regionToFlush = null;
> - this.cacheFlushLock.lock();
> - try {
> - if (closed) {
> - return regionToFlush;
> - }
> - synchronized (updateLock) {
> - // Clean up current writer.
> - Path oldFile = cleanupCurrentWriter();
> - // Create a new one.
> - this.old_filenum = this.filenum;
> - this.filenum = System.currentTimeMillis();
> - Path newPath = computeFilename(this.filenum);
> -
> - this.writer = SequenceFile.createWriter(this.fs, this.conf,
> newPath,
> - HLogKey.class, HLogEdit.class,
> - fs.getConf().getInt("io.file.buffer.size", 4096),
> - fs.getDefaultReplication(), this.blocksize,
> - SequenceFile.CompressionType.NONE, new DefaultCodec(), null,
> - new Metadata());
> -
> - LOG.info((oldFile != null?
> - "Closed " + oldFile + ", entries=" + this.numEntries + ". ":
> "") +
> - "New log writer: " + FSUtils.getPath(newPath));
> -
> - // Can we delete any of the old log files?
> - if (this.outputfiles.size() > 0) {
> - if (this.lastSeqWritten.size() <= 0) {
> - LOG.debug("Last sequence written is empty. Deleting all old
> hlogs");
> - // If so, then no new writes have come in since all regions
> were
> - // flushed (and removed from the lastSeqWritten map). Means
> can
> - // remove all but currently open log file.
> - for (Map.Entry<Long, Path> e : this.outputfiles.entrySet()) {
> - deleteLogFile(e.getValue(), e.getKey());
> - }
> - this.outputfiles.clear();
> - } else {
> - regionToFlush = cleanOldLogs();
> - }
> - }
> - this.numEntries = 0;
> - updateLock.notifyAll();
> - }
> - } finally {
> - this.cacheFlushLock.unlock();
> - }
> - return regionToFlush;
> - }
> -
> - /*
> - * Clean up old commit logs.
> - * @return If lots of logs, flush the returned region so next time
> through
> - * we can clean logs. Returns null if nothing to flush.
> - * @throws IOException
> - */
> - private byte [] cleanOldLogs() throws IOException {
> - byte [] regionToFlush = null;
> - Long oldestOutstandingSeqNum = getOldestOutstandingSeqNum();
> - // Get the set of all log files whose final ID is older than or
> - // equal to the oldest pending region operation
> - TreeSet<Long> sequenceNumbers =
> - new TreeSet<Long>(this.outputfiles.headMap(
> - (Long.valueOf(oldestOutstandingSeqNum.longValue() +
> 1L))).keySet());
> - // Now remove old log files (if any)
> - byte [] oldestRegion = null;
> - if (LOG.isDebugEnabled()) {
> - // Find region associated with oldest key -- helps debugging.
> - oldestRegion = getOldestRegion(oldestOutstandingSeqNum);
> - LOG.debug("Found " + sequenceNumbers.size() + " logs to remove " +
> - " out of total " + this.outputfiles.size() + "; " +
> - "oldest outstanding seqnum is " + oldestOutstandingSeqNum +
> - " from region " + Bytes.toString(oldestRegion));
> - }
> - if (sequenceNumbers.size() > 0) {
> - for (Long seq : sequenceNumbers) {
> - deleteLogFile(this.outputfiles.remove(seq), seq);
> - }
> - }
> - int countOfLogs = this.outputfiles.size() - sequenceNumbers.size();
> - if (countOfLogs > this.maxLogs) {
> - regionToFlush = oldestRegion != null?
> - oldestRegion: getOldestRegion(oldestOutstandingSeqNum);
> - LOG.info("Too many logs: logs=" + countOfLogs + ", maxlogs=" +
> - this.maxLogs + "; forcing flush of region with oldest edits: " +
> - Bytes.toString(regionToFlush));
> - }
> - return regionToFlush;
> - }
> -
> - /*
> - * @return Logs older than this id are safe to remove.
> - */
> - private Long getOldestOutstandingSeqNum() {
> - return Collections.min(this.lastSeqWritten.values());
> - }
> -
> - private byte [] getOldestRegion(final Long oldestOutstandingSeqNum) {
> - byte [] oldestRegion = null;
> - for (Map.Entry<byte [], Long> e: this.lastSeqWritten.entrySet()) {
> - if (e.getValue().longValue() ==
> oldestOutstandingSeqNum.longValue()) {
> - oldestRegion = e.getKey();
> - break;
> - }
> - }
> - return oldestRegion;
> - }
> -
> - /*
> - * Cleans up current writer closing and adding to outputfiles.
> - * Presumes we're operating inside an updateLock scope.
> - * @return Path to current writer or null if none.
> - * @throws IOException
> - */
> - private Path cleanupCurrentWriter() throws IOException {
> - Path oldFile = null;
> - if (this.writer != null) {
> - // Close the current writer, get a new one.
> - try {
> - this.writer.close();
> - } catch (IOException e) {
> - // Failed close of log file. Means we're losing edits. For now,
> - // shut ourselves down to minimize loss. Alternative is to try
> and
> - // keep going. See HBASE-930.
> - FailedLogCloseException flce =
> - new FailedLogCloseException("#" + this.filenum);
> - flce.initCause(e);
> - throw e;
> - }
> - oldFile = computeFilename(old_filenum);
> - if (filenum > 0) {
> - synchronized (this.sequenceLock) {
> - this.outputfiles.put(Long.valueOf(this.logSeqNum - 1),
> oldFile);
> - }
> - }
> - }
> - return oldFile;
> - }
> -
> - private void deleteLogFile(final Path p, final Long seqno) throws
> IOException {
> - LOG.info("removing old log file " + FSUtils.getPath(p) +
> - " whose highest sequence/edit id is " + seqno);
> - this.fs.delete(p, true);
> - }
> -
> - /**
> - * This is a convenience method that computes a new filename with a
> given
> - * file-number.
> - * @param fn
> - * @return Path
> - */
> - public Path computeFilename(final long fn) {
> - return new Path(dir, HLOG_DATFILE + fn);
> - }
> -
> - /**
> - * Shut down the log and delete the log directory
> - *
> - * @throws IOException
> - */
> - public void closeAndDelete() throws IOException {
> - close();
> - fs.delete(dir, true);
> - }
> -
> - /**
> - * Shut down the log.
> - *
> - * @throws IOException
> - */
> - public void close() throws IOException {
> - cacheFlushLock.lock();
> - try {
> - synchronized (updateLock) {
> - this.closed = true;
> - if (LOG.isDebugEnabled()) {
> - LOG.debug("closing log writer in " + this.dir.toString());
> - }
> - this.writer.close();
> - updateLock.notifyAll();
> - }
> - } finally {
> - cacheFlushLock.unlock();
> - }
> - }
> -
> - /**
> - * Append a set of edits to the log. Log edits are keyed by regionName,
> - * rowname, and log-sequence-id.
> - *
> - * Later, if we sort by these keys, we obtain all the relevant edits
> for a
> - * given key-range of the HRegion (TODO). Any edits that do not have a
> - * matching {@link HConstants#COMPLETE_CACHEFLUSH} message can be
> discarded.
> - *
> - * <p>
> - * Logs cannot be restarted once closed, or once the HLog process dies.
> Each
> - * time the HLog starts, it must create a new log. This means that
> other
> - * systems should process the log appropriately upon each startup (and
> prior
> - * to initializing HLog).
> - *
> - * synchronized prevents appends during the completion of a cache flush
> or for
> - * the duration of a log roll.
> - *
> - * @param regionName
> - * @param tableName
> - * @param edits
> - * @param sync
> - * @throws IOException
> - */
> - void append(byte [] regionName, byte [] tableName,
> - TreeMap<HStoreKey, byte[]> edits, boolean sync)
> - throws IOException {
> - if (closed) {
> - throw new IOException("Cannot append; log is closed");
> - }
> - synchronized (updateLock) {
> - long seqNum[] = obtainSeqNum(edits.size());
> - // The 'lastSeqWritten' map holds the sequence number of the oldest
> - // write for each region. When the cache is flushed, the entry for
> the
> - // region being flushed is removed if the sequence number of the
> flush
> - // is greater than or equal to the value in lastSeqWritten.
> - if (!this.lastSeqWritten.containsKey(regionName)) {
> - this.lastSeqWritten.put(regionName, Long.valueOf(seqNum[0]));
> - }
> - int counter = 0;
> - for (Map.Entry<HStoreKey, byte[]> es : edits.entrySet()) {
> - HStoreKey key = es.getKey();
> - HLogKey logKey =
> - new HLogKey(regionName, tableName, key.getRow(),
> seqNum[counter++]);
> - HLogEdit logEdit =
> - new HLogEdit(key.getColumn(), es.getValue(),
> key.getTimestamp());
> - doWrite(logKey, logEdit, sync);
> -
> - this.numEntries++;
> - }
> - updateLock.notifyAll();
> - }
> - if (this.numEntries > this.maxlogentries) {
> - requestLogRoll();
> - }
> - }
> -
> - // This is public only because it implements a method in Syncable.
> - public void sync() throws IOException {
> - if (LOG.isDebugEnabled()) {
> - LOG.debug("Sync-ing " + unflushedEntries + ". Last flush time was:
> " +
> - DATE_FORMAT.format(new Date(lastLogFlushTime)));
> - }
> - lastLogFlushTime = System.currentTimeMillis();
> - this.writer.syncFs();
> - unflushedEntries = 0;
> - }
> -
> - void optionalSync() {
> - if (!this.closed) {
> - synchronized (updateLock) {
> - if (((System.currentTimeMillis() - this.optionalFlushInterval) >
> - this.lastLogFlushTime) && this.unflushedEntries > 0) {
> - try {
> - sync();
> - } catch (IOException e) {
> - LOG.error("Error flushing HLog", e);
> - }
> - }
> - }
> - }
> - }
> -
> - private void requestLogRoll() {
> - if (this.listener != null) {
> - this.listener.logRollRequested();
> - }
> - }
> -
> - private void doWrite(HLogKey logKey, HLogEdit logEdit, boolean sync)
> - throws IOException {
> - try {
> - this.writer.append(logKey, logEdit);
> - if (sync || ++unflushedEntries >= flushlogentries) {
> - sync();
> - }
> - } catch (IOException e) {
> - LOG.fatal("Could not append. Requesting close of log", e);
> - requestLogRoll();
> - throw e;
> - }
> - }
> -
> - /** Append an entry without a row to the log.
> - *
> - * @param regionInfo
> - * @param logEdit
> - * @throws IOException
> - */
> - public void append(HRegionInfo regionInfo, HLogEdit logEdit) throws
> IOException {
> - this.append(regionInfo, new byte[0], logEdit);
> - }
> -
> - /** Append an entry to the log.
> - *
> - * @param regionInfo
> - * @param row
> - * @param logEdit
> - * @throws IOException
> - */
> - public void append(HRegionInfo regionInfo, byte [] row, HLogEdit
> logEdit)
> - throws IOException {
> - if (closed) {
> - throw new IOException("Cannot append; log is closed");
> - }
> - byte [] regionName = regionInfo.getRegionName();
> - byte [] tableName = regionInfo.getTableDesc().getName();
> -
> - synchronized (updateLock) {
> - long seqNum = obtainSeqNum();
> - // The 'lastSeqWritten' map holds the sequence number of the oldest
> - // write for each region. When the cache is flushed, the entry for
> the
> - // region being flushed is removed if the sequence number of the
> flush
> - // is greater than or equal to the value in lastSeqWritten.
> - if (!this.lastSeqWritten.containsKey(regionName)) {
> - this.lastSeqWritten.put(regionName, Long.valueOf(seqNum));
> - }
> -
> - HLogKey logKey = new HLogKey(regionName, tableName, row, seqNum);
> - boolean sync = regionInfo.isMetaRegion() ||
> regionInfo.isRootRegion();
> - doWrite(logKey, logEdit, sync);
> - this.numEntries++;
> - updateLock.notifyAll();
> - }
> -
> - if (this.numEntries > this.maxlogentries) {
> - if (listener != null) {
> - listener.logRollRequested();
> - }
> - }
> - }
> -
> - /** @return How many items have been added to the log */
> - int getNumEntries() {
> - return numEntries;
> - }
> -
> - /**
> - * Obtain a log sequence number.
> - */
> - private long obtainSeqNum() {
> - long value;
> - synchronized (sequenceLock) {
> - value = logSeqNum++;
> - }
> - return value;
> - }
> -
> - /** @return the number of log files in use */
> - int getNumLogFiles() {
> - return outputfiles.size();
> - }
> -
> - /**
> - * Obtain a specified number of sequence numbers
> - *
> - * @param num number of sequence numbers to obtain
> - * @return array of sequence numbers
> - */
> - private long[] obtainSeqNum(int num) {
> - long[] results = new long[num];
> - synchronized (this.sequenceLock) {
> - for (int i = 0; i < num; i++) {
> - results[i] = this.logSeqNum++;
> - }
> - }
> - return results;
> - }
> -
> - /**
> - * By acquiring a log sequence ID, we can allow log messages to
> continue while
> - * we flush the cache.
> - *
> - * Acquire a lock so that we do not roll the log between the start and
> - * completion of a cache-flush. Otherwise the log-seq-id for the flush
> will
> - * not appear in the correct logfile.
> - *
> - * @return sequence ID to pass {@link #completeCacheFlush(Text, Text,
> long)}
> - * @see #completeCacheFlush(Text, Text, long)
> - * @see #abortCacheFlush()
> - */
> - long startCacheFlush() {
> - this.cacheFlushLock.lock();
> - return obtainSeqNum();
> - }
> -
> - /**
> - * Complete the cache flush
> - *
> - * Protected by cacheFlushLock
> - *
> - * @param regionName
> - * @param tableName
> - * @param logSeqId
> - * @throws IOException
> - */
> - void completeCacheFlush(final byte [] regionName, final byte []
> tableName,
> - final long logSeqId) throws IOException {
> -
> - try {
> - if (this.closed) {
> - return;
> - }
> - synchronized (updateLock) {
> - this.writer.append(new HLogKey(regionName, tableName,
> HLog.METAROW, logSeqId),
> - new HLogEdit(HLog.METACOLUMN,
> HLogEdit.completeCacheFlush.get(),
> - System.currentTimeMillis()));
> - this.numEntries++;
> - Long seq = this.lastSeqWritten.get(regionName);
> - if (seq != null && logSeqId >= seq.longValue()) {
> - this.lastSeqWritten.remove(regionName);
> - }
> - updateLock.notifyAll();
> - }
> - } finally {
> - this.cacheFlushLock.unlock();
> - }
> - }
> -
> - /**
> - * Abort a cache flush.
> - * Call if the flush fails. Note that the only recovery for an aborted
> flush
> - * currently is a restart of the regionserver so the snapshot content
> dropped
> - * by the failure gets restored to the memcache.
> - */
> - void abortCacheFlush() {
> - this.cacheFlushLock.unlock();
> - }
> -
> - /**
> - * @param column
> - * @return true if the column is a meta column
> - */
> - public static boolean isMetaColumn(byte [] column) {
> - return Bytes.equals(METACOLUMN, column);
> - }
> -
> - /**
> - * Split up a bunch of regionserver commit log files that are no longer
> - * being written to, into new files, one per region for region to
> replay on
> - * startup. Delete the old log files when finished.
> - *
> - * @param rootDir qualified root directory of the HBase instance
> - * @param srcDir Directory of log files to split: e.g.
> - * <code>${ROOTDIR}/log_HOST_PORT</code>
> - * @param fs FileSystem
> - * @param conf HBaseConfiguration
> - * @throws IOException
> - */
> - public static void splitLog(final Path rootDir, final Path srcDir,
> - final FileSystem fs, final Configuration conf)
> - throws IOException {
> - if (!fs.exists(srcDir)) {
> - // Nothing to do
> - return;
> - }
> - FileStatus [] logfiles = fs.listStatus(srcDir);
> - if (logfiles == null || logfiles.length == 0) {
> - // Nothing to do
> - return;
> - }
> - LOG.info("Splitting " + logfiles.length + " log(s) in " +
> - srcDir.toString());
> - splitLog(rootDir, logfiles, fs, conf);
> - try {
> - fs.delete(srcDir, true);
> - } catch (IOException e) {
> - e = RemoteExceptionHandler.checkIOException(e);
> - IOException io = new IOException("Cannot delete: " + srcDir);
> - io.initCause(e);
> - throw io;
> - }
> - LOG.info("log file splitting completed for " + srcDir.toString());
> - }
> -
> - /*
> - * @param rootDir
> - * @param logfiles
> - * @param fs
> - * @param conf
> - * @throws IOException
> - */
> - private static void splitLog(final Path rootDir, final FileStatus []
> logfiles,
> - final FileSystem fs, final Configuration conf)
> - throws IOException {
> - Map<byte [], SequenceFile.Writer> logWriters =
> - new TreeMap<byte [], SequenceFile.Writer>(Bytes.BYTES_COMPARATOR);
> -
> - long leaseRecoveryPeriod =
> - conf.getLong("hbase.regionserver.hlog.leaserecoveryperiod", 10000);
> -
> - try {
> - for (int i = 0; i < logfiles.length; i++) {
> - if (LOG.isDebugEnabled()) {
> - LOG.debug("Splitting " + (i + 1) + " of " + logfiles.length +
> ": " +
> - logfiles[i].getPath());
> - }
> - // Recover the file's lease if necessary
> - try {
> - while (true) {
> - try {
> - FSDataOutputStream out = fs.append(logfiles[i].getPath());
> - out.close();
> - break;
> - } catch (IOException e) {
> - e = RemoteExceptionHandler.checkIOException(e);
> - if (e instanceof EOFException) {
> - throw e;
> - }
> - if (LOG.isDebugEnabled()) {
> - LOG.debug("Triggering lease recovery.");
> - }
> - }
> - try {
> - Thread.sleep(leaseRecoveryPeriod);
> - } catch (InterruptedException ex) {
> - // ignore it and try again
> - }
> - }
> - } catch (EOFException e) {
> - // file is empty, skip it
> - continue;
> - }
> - if (logfiles[i].getLen() <= 0) {
> - // File is empty, skip it.
> - continue;
> - }
> - HLogKey key = new HLogKey();
> - HLogEdit val = new HLogEdit();
> - try {
> - SequenceFile.Reader in =
> - new SequenceFile.Reader(fs, logfiles[i].getPath(), conf);
> - try {
> - int count = 0;
> - for (; in.next(key, val); count++) {
> - byte [] tableName = key.getTablename();
> - byte [] regionName = key.getRegionName();
> - SequenceFile.Writer w = logWriters.get(regionName);
> - if (w == null) {
> - Path logfile = new Path(
> - HRegion.getRegionDir(
> - HTableDescriptor.getTableDir(rootDir, tableName),
> - HRegionInfo.encodeRegionName(regionName)),
> - HREGION_OLDLOGFILE_NAME);
> - Path oldlogfile = null;
> - SequenceFile.Reader old = null;
> - if (fs.exists(logfile)) {
> - LOG.warn("Old log file " + logfile +
> - " already exists. Copying existing file to new file");
> - oldlogfile = new Path(logfile.toString() + ".old");
> - fs.rename(logfile, oldlogfile);
> - old = new SequenceFile.Reader(fs, oldlogfile, conf);
> - }
> - w = SequenceFile.createWriter(fs, conf, logfile,
> HLogKey.class,
> - HLogEdit.class, getCompressionType(conf));
> - // Use copy of regionName; regionName object is reused
> inside in
> - // HStoreKey.getRegionName so its content changes as we
> iterate.
> - logWriters.put(regionName, w);
> - if (LOG.isDebugEnabled()) {
> - LOG.debug("Creating new log file writer for path " +
> logfile +
> - " and region " + Bytes.toString(regionName));
> - }
> -
> - if (old != null) {
> - // Copy from existing log file
> - HLogKey oldkey = new HLogKey();
> - HLogEdit oldval = new HLogEdit();
> - for (; old.next(oldkey, oldval); count++) {
> - if (LOG.isDebugEnabled() && count > 0 && count %
> 10000 == 0) {
> - LOG.debug("Copied " + count + " edits");
> - }
> - w.append(oldkey, oldval);
> - }
> - old.close();
> - fs.delete(oldlogfile, true);
> - }
> - }
> - w.append(key, val);
> - }
> - if (LOG.isDebugEnabled()) {
> - LOG.debug("Applied " + count + " total edits from " +
> - logfiles[i].getPath().toString());
> - }
> - } catch (IOException e) {
> - e = RemoteExceptionHandler.checkIOException(e);
> - if (!(e instanceof EOFException)) {
> - LOG.warn("Exception processing " + logfiles[i].getPath() +
> - " -- continuing. Possible DATA LOSS!", e);
> - }
> - } finally {
> - try {
> - in.close();
> - } catch (IOException e) {
> - LOG.warn("Close in finally threw exception -- continuing",
> e);
> - }
> - }
> - } catch (IOException e) {
> - e = RemoteExceptionHandler.checkIOException(e);
> - if (e instanceof EOFException) {
> - // No recoverable data in file. Skip it.
> - continue;
> - }
> -
> - } finally {
> - // Delete the input file now so we do not replay edits. We
> could
> - // have gotten here because of an exception. If so, probably
> - // nothing we can do about it. Replaying it, it could work but
> we
> - // could be stuck replaying for ever. Just continue though we
> - // could have lost some edits.
> - fs.delete(logfiles[i].getPath(), true);
> - }
> - }
> - } finally {
> - for (SequenceFile.Writer w : logWriters.values()) {
> - w.close();
> - }
> - }
> - }
> -
> - /**
> - * Construct the HLog directory name
> - *
> - * @param info HServerInfo for server
> - * @return the HLog directory name
> - */
> - public static String getHLogDirectoryName(HServerInfo info) {
> - StringBuilder dirName = new StringBuilder("log_");
> - try {
> - dirName.append(URLEncoder.encode(
> - info.getServerAddress().getBindAddress(), UTF8_ENCODING));
> - } catch (UnsupportedEncodingException e) {
> - LOG.error("Error encoding '" +
> info.getServerAddress().getBindAddress()
> - + "'", e);
> - }
> - dirName.append("_");
> - dirName.append(info.getStartCode());
> - dirName.append("_");
> - dirName.append(info.getServerAddress().getPort());
> - return dirName.toString();
> - }
> -
> - private static void usage() {
> - System.err.println("Usage: java org.apache.hbase.HLog" +
> - " {--dump <logfile>... | --split <logdir>...}");
> - }
> -
> - /**
> - * Pass one or more log file names and it will either dump out a text
> version
> - * on <code>stdout</code> or split the specified log files.
> - *
> - * @param args
> - * @throws IOException
> - */
> - public static void main(String[] args) throws IOException {
> - if (args.length < 2) {
> - usage();
> - System.exit(-1);
> - }
> - boolean dump = true;
> - if (args[0].compareTo("--dump") != 0) {
> - if (args[0].compareTo("--split") == 0) {
> - dump = false;
> -
> - } else {
> - usage();
> - System.exit(-1);
> - }
> - }
> - Configuration conf = new HBaseConfiguration();
> - FileSystem fs = FileSystem.get(conf);
> - Path baseDir = new Path(conf.get(HBASE_DIR));
> -
> - for (int i = 1; i < args.length; i++) {
> - Path logPath = new Path(args[i]);
> - if (!fs.exists(logPath)) {
> - throw new FileNotFoundException(args[i] + " does not exist");
> - }
> - if (dump) {
> - if (!fs.isFile(logPath)) {
> - throw new IOException(args[i] + " is not a file");
> - }
> - Reader log = new SequenceFile.Reader(fs, logPath, conf);
> - try {
> - HLogKey key = new HLogKey();
> - HLogEdit val = new HLogEdit();
> - while (log.next(key, val)) {
> - System.out.println(key.toString() + " " + val.toString());
> - }
> - } finally {
> - log.close();
> - }
> - } else {
> - if (!fs.getFileStatus(logPath).isDir()) {
> - throw new IOException(args[i] + " is not a directory");
> - }
> - splitLog(baseDir, logPath, fs, conf);
> - }
> - }
> - }
> -}
> +import org.apache.hadoop.io.compress.DefaultCodec;
> +
> +/**
> + * HLog stores all the edits to the HStore.
> + *
> + * It performs logfile-rolling, so external callers are not aware that
> the
> + * underlying file is being rolled.
> + *
> + * <p>
> + * A single HLog is used by several HRegions simultaneously.
> + *
> + * <p>
> + * Each HRegion is identified by a unique long <code>int</code>. HRegions
> do
> + * not need to declare themselves before using the HLog; they simply
> include
> + * their HRegion-id in the <code>append</code> or
> + * <code>completeCacheFlush</code> calls.
> + *
> + * <p>
> + * An HLog consists of multiple on-disk files, which have a chronological
> order.
> + * As data is flushed to other (better) on-disk structures, the log
> becomes
> + * obsolete. We can destroy all the log messages for a given HRegion-id
> up to
> + * the most-recent CACHEFLUSH message from that HRegion.
> + *
> + * <p>
> + * It's only practical to delete entire files. Thus, we delete an entire
> on-disk
> + * file F when all of the messages in F have a log-sequence-id that's
> older
> + * (smaller) than the most-recent CACHEFLUSH message for every HRegion
> that has
> + * a message in F.
> + *
> + * <p>
> + * Synchronized methods can never execute in parallel. However, between
> the
> + * start of a cache flush and the completion point, appends are allowed
> but log
> + * rolling is not. To prevent log rolling taking place during this
> period, a
> + * separate reentrant lock is used.
> + *
> + */
> +public class HLog implements HConstants, Syncable {
> + private static final Log LOG = LogFactory.getLog(HLog.class);
> + private static final String HLOG_DATFILE = "hlog.dat.";
> + static final byte [] METACOLUMN = Bytes.toBytes("METACOLUMN:");
> + static final byte [] METAROW = Bytes.toBytes("METAROW");
> + final FileSystem fs;
> + final Path dir;
> + final Configuration conf;
> + final LogRollListener listener;
> + private final int maxlogentries;
> + private final long optionalFlushInterval;
> + private final long blocksize;
> + private final int flushlogentries;
> + private volatile int unflushedEntries = 0;
> + private volatile long lastLogFlushTime;
> + final long threadWakeFrequency;
> +
> + /*
> + * Current log file.
> + */
> + SequenceFile.Writer writer;
> +
> + /*
> + * Map of all log files but the current one.
> + */
> + final SortedMap<Long, Path> outputfiles =
> + Collections.synchronizedSortedMap(new TreeMap<Long, Path>());
> +
> + /*
> + * Map of region to last sequence/edit id.
> + */
> + private final Map<byte [], Long> lastSeqWritten = Collections.
> + synchronizedSortedMap(new TreeMap<byte [],
> Long>(Bytes.BYTES_COMPARATOR));
> +
> + private volatile boolean closed = false;
> +
> + private final Integer sequenceLock = new Integer(0);
> + private volatile long logSeqNum = 0;
> +
> + private volatile long filenum = 0;
> + private volatile long old_filenum = -1;
> +
> + private volatile int numEntries = 0;
> +
> + // This lock prevents starting a log roll during a cache flush.
> + // synchronized is insufficient because a cache flush spans two method
> calls.
> + private final Lock cacheFlushLock = new ReentrantLock();
> +
> + // We synchronize on updateLock to prevent updates and to prevent a log
> roll
> + // during an update
> + private final Integer updateLock = new Integer(0);
> +
> + /*
> + * If more than this many logs, force flush of oldest region to oldest
> edit
> + * goes to disk. If too many and we crash, then will take forever
> replaying.
> + * Keep the number of logs tidy.
> + */
> + private final int maxLogs;
> +
> + /**
> + * Create an edit log at the given <code>dir</code> location.
> + *
> + * You should never have to load an existing log. If there is a log at
> + * startup, it should have already been processed and deleted by the
> time the
> + * HLog object is started up.
> + *
> + * @param fs
> + * @param dir
> + * @param conf
> + * @param listener
> + * @throws IOException
> + */
> + public HLog(final FileSystem fs, final Path dir, final Configuration
> conf,
> + final LogRollListener listener)
> + throws IOException {
> + super();
> + this.fs = fs;
> + this.dir = dir;
> + this.conf = conf;
> + this.listener = listener;
> + this.maxlogentries =
> + conf.getInt("hbase.regionserver.maxlogentries", 100000);
> + this.flushlogentries =
> + conf.getInt("hbase.regionserver.flushlogentries", 100);
> + this.blocksize =
> + conf.getLong("hbase.regionserver.hlog.blocksize", 1024L * 1024L);
> + this.optionalFlushInterval =
> + conf.getLong("hbase.regionserver.optionallogflushinterval", 10 *
> 1000);
> + this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 *
> 1000);
> + this.lastLogFlushTime = System.currentTimeMillis();
> + if (fs.exists(dir)) {
> + throw new IOException("Target HLog directory already exists: " +
> dir);
> + }
> + fs.mkdirs(dir);
> + this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 64);
> + rollWriter();
> + }
> +
> + /**
> + * Accessor for tests. Not a part of the public API.
> + * @return Current state of the monotonically increasing file id.
> + */
> + public long getFilenum() {
> + return this.filenum;
> + }
> +
> + /**
> + * Get the compression type for the hlog files.
> + * @param c Configuration to use.
> + * @return the kind of compression to use
> + */
> + private static CompressionType getCompressionType(final Configuration
> c) {
> + String name = c.get("hbase.io.seqfile.compression.type");
> + return name == null? CompressionType.NONE:
> CompressionType.valueOf(name);
> + }
> +
> + /**
> + * Called by HRegionServer when it opens a new region to ensure that
> log
> + * sequence numbers are always greater than the latest sequence number
> of the
> + * region being brought on-line.
> + *
> + * @param newvalue We'll set log edit/sequence number to this value if
> it
> + * is greater than the current value.
> + */
> + void setSequenceNumber(long newvalue) {
> + synchronized (sequenceLock) {
> + if (newvalue > logSeqNum) {
> + if (LOG.isDebugEnabled()) {
> + LOG.debug("changing sequence number from " + logSeqNum + " to "
> +
> + newvalue);
> + }
> + logSeqNum = newvalue;
> + }
> + }
> + }
> +
> + /**
> + * @return log sequence number
> + */
> + public long getSequenceNumber() {
> + return logSeqNum;
> + }
> +
> + /**
> + * Roll the log writer. That is, start writing log messages to a new
> file.
> + *
> + * Because a log cannot be rolled during a cache flush, and a cache
> flush
> + * spans two method calls, a special lock needs to be obtained so that
> a cache
> + * flush cannot start when the log is being rolled and the log cannot
> be
> + * rolled during a cache flush.
> + *
> + * <p>Note that this method cannot be synchronized because it is
> possible that
> + * startCacheFlush runs, obtaining the cacheFlushLock, then this method
> could
> + * start which would obtain the lock on this but block on obtaining the
> + * cacheFlushLock and then completeCacheFlush could be called which
> would wait
> + * for the lock on this and consequently never release the
> cacheFlushLock
> + *
> + * @return If lots of logs, flush the returned region so next time
> through
> + * we can clean logs. Returns null if nothing to flush.
> + * @throws FailedLogCloseException
> + * @throws IOException
> + */
> + public byte [] rollWriter() throws FailedLogCloseException, IOException
> {
> + byte [] regionToFlush = null;
> + this.cacheFlushLock.lock();
> + try {
> + if (closed) {
> + return regionToFlush;
> + }
> + synchronized (updateLock) {
> + // Clean up current writer.
> + Path oldFile = cleanupCurrentWriter();
> + // Create a new one.
> + this.old_filenum = this.filenum;
> + this.filenum = System.currentTimeMillis();
> + Path newPath = computeFilename(this.filenum);
> +
> + this.writer = SequenceFile.createWriter(this.fs, this.conf,
> newPath,
> + HLogKey.class, HLogEdit.class,
> + fs.getConf().getInt("io.file.buffer.size", 4096),
> + fs.getDefaultReplication(), this.blocksize,
> + SequenceFile.CompressionType.NONE, new DefaultCodec(), null,
> + new Metadata());
> +
> + LOG.info((oldFile != null?
> + "Closed " + oldFile + ", entries=" + this.numEntries + ". ":
> "") +
> + "New log writer: " + FSUtils.getPath(newPath));
> +
> + // Can we delete any of the old log files?
> + if (this.outputfiles.size() > 0) {
> + if (this.lastSeqWritten.size() <= 0) {
> + LOG.debug("Last sequence written is empty. Deleting all old
> hlogs");
> + // If so, then no new writes have come in since all regions
> were
> + // flushed (and removed from the lastSeqWritten map). Means
> can
> + // remove all but currently open log file.
> + for (Map.Entry<Long, Path> e : this.outputfiles.entrySet()) {
> + deleteLogFile(e.getValue(), e.getKey());
> + }
> + this.outputfiles.clear();
> + } else {
> + regionToFlush = cleanOldLogs();
> + }
> + }
> + this.numEntries = 0;
> + updateLock.notifyAll();
> + }
> + } finally {
> + this.cacheFlushLock.unlock();
> + }
> + return regionToFlush;
> + }
> +
> + /*
> + * Clean up old commit logs.
> + * @return If lots of logs, flush the returned region so next time
> through
> + * we can clean logs. Returns null if nothing to flush.
> + * @throws IOException
> + */
> + private byte [] cleanOldLogs() throws IOException {
> + byte [] regionToFlush = null;
> + Long oldestOutstandingSeqNum = getOldestOutstandingSeqNum();
> + // Get the set of all log files whose final ID is older than or
> + // equal to the oldest pending region operation
> + TreeSet<Long> sequenceNumbers =
> + new TreeSet<Long>(this.outputfiles.headMap(
> + (Long.valueOf(oldestOutstandingSeqNum.longValue() +
> 1L))).keySet());
> + // Now remove old log files (if any)
> + byte [] oldestRegion = null;
> + if (LOG.isDebugEnabled()) {
> + // Find region associated with oldest key -- helps debugging.
> + oldestRegion = getOldestRegion(oldestOutstandingSeqNum);
> + LOG.debug("Found " + sequenceNumbers.size() + " logs to remove " +
> + " out of total " + this.outputfiles.size() + "; " +
> + "oldest outstanding seqnum is " + oldestOutstandingSeqNum +
> + " from region " + Bytes.toString(oldestRegion));
> + }
> + if (sequenceNumbers.size() > 0) {
> + for (Long seq : sequenceNumbers) {
> + deleteLogFile(this.outputfiles.remove(seq), seq);
> + }
> + }
> + int countOfLogs = this.outputfiles.size() - sequenceNumbers.size();
> + if (countOfLogs > this.maxLogs) {
> + regionToFlush = oldestRegion != null?
> + oldestRegion: getOldestRegion(oldestOutstandingSeqNum);
> + LOG.info("Too many logs: logs=" + countOfLogs + ", maxlogs=" +
> + this.maxLogs + "; forcing flush of region with oldest edits: " +
> + Bytes.toString(regionToFlush));
> + }
> + return regionToFlush;
> + }
> +
> + /*
> + * @return Logs older than this id are safe to remove.
> + */
> + private Long getOldestOutstandingSeqNum() {
> + return Collections.min(this.lastSeqWritten.values());
> + }
> +
> + private byte [] getOldestRegion(final Long oldestOutstandingSeqNum) {
> + byte [] oldestRegion = null;
> + for (Map.Entry<byte [], Long> e: this.lastSeqWritten.entrySet()) {
> + if (e.getValue().longValue() ==
> oldestOutstandingSeqNum.longValue()) {
> + oldestRegion = e.getKey();
> + break;
> + }
> + }
> + return oldestRegion;
> + }
> +
> + /*
> + * Cleans up current writer closing and adding to outputfiles.
> + * Presumes we're operating inside an updateLock scope.
> + * @return Path to current writer or null if none.
> + * @throws IOException
> + */
> + private Path cleanupCurrentWriter() throws IOException {
> + Path oldFile = null;
> + if (this.writer != null) {
> + // Close the current writer, get a new one.
> + try {
> + this.writer.close();
> + } catch (IOException e) {
> + // Failed close of log file. Means we're losing edits. For now,
> + // shut ourselves down to minimize loss. Alternative is to try
> and
> + // keep going. See HBASE-930.
> + FailedLogCloseException flce =
> + new FailedLogCloseException("#" + this.filenum);
> + flce.initCause(e);
> + throw e;
> + }
> + oldFile = computeFilename(old_filenum);
> + if (filenum > 0) {
> + synchronized (this.sequenceLock) {
> + this.outputfiles.put(Long.valueOf(this.logSeqNum - 1),
> oldFile);
> + }
> + }
> + }
> + return oldFile;
> + }
> +
> + private void deleteLogFile(final Path p, final Long seqno) throws
> IOException {
> + LOG.info("removing old log file " + FSUtils.getPath(p) +
> + " whose highest sequence/edit id is " + seqno);
> + this.fs.delete(p, true);
> + }
> +
> + /**
> + * This is a convenience method that computes a new filename with a
> given
> + * file-number.
> + * @param fn
> + * @return Path
> + */
> + public Path computeFilename(final long fn) {
> + return new Path(dir, HLOG_DATFILE + fn);
> + }
> +
> + /**
> + * Shut down the log and delete the log directory
> + *
> + * @throws IOException
> + */
> + public void closeAndDelete() throws IOException {
> + close();
> + fs.delete(dir, true);
> + }
> +
> + /**
> + * Shut down the log.
> + *
> + * @throws IOException
> + */
> + public void close() throws IOException {
> + cacheFlushLock.lock();
> + try {
> + synchronized (updateLock) {
> + this.closed = true;
> + if (LOG.isDebugEnabled()) {
> + LOG.debug("closing log writer in " + this.dir.toString());
> + }
> + this.writer.close();
> + updateLock.notifyAll();
> + }
> + } finally {
> + cacheFlushLock.unlock();
> + }
> + }
> +
> + /**
> + * Append a set of edits to the log. Log edits are keyed by regionName,
> + * rowname, and log-sequence-id.
> + *
> + * Later, if we sort by these keys, we obtain all the relevant edits
> for a
> + * given key-range of the HRegion (TODO). Any edits that do not have a
> + * matching {@link HConstants#COMPLETE_CACHEFLUSH} message can be
> discarded.
> + *
> + * <p>
> + * Logs cannot be restarted once closed, or once the HLog process dies.
> Each
> + * time the HLog starts, it must create a new log. This means that
> other
> + * systems should process the log appropriately upon each startup (and
> prior
> + * to initializing HLog).
> + *
> + * synchronized prevents appends during the completion of a cache flush
> or for
> + * the duration of a log roll.
> + *
> + * @param regionName
> + * @param tableName
> + * @param edits
> + * @param sync
> + * @throws IOException
> + */
> + void append(byte [] regionName, byte [] tableName,
> + TreeMap<HStoreKey, byte[]> edits, boolean sync)
> + throws IOException {
> + if (closed) {
> + throw new IOException("Cannot append; log is closed");
> + }
> + synchronized (updateLock) {
> + long seqNum[] = obtainSeqNum(edits.size());
> + // The 'lastSeqWritten' map holds the sequence number of the oldest
> + // write for each region. When the cache is flushed, the entry for
> the
> + // region being flushed is removed if the sequence number of the
> flush
> + // is greater than or equal to the value in lastSeqWritten.
> + if (!this.lastSeqWritten.containsKey(regionName)) {
> + this.lastSeqWritten.put(regionName, Long.valueOf(seqNum[0]));
> + }
> + int counter = 0;
> + for (Map.Entry<HStoreKey, byte[]> es : edits.entrySet()) {
> + HStoreKey key = es.getKey();
> + HLogKey logKey =
> + new HLogKey(regionName, tableName, key.getRow(),
> seqNum[counter++]);
> + HLogEdit logEdit =
> + new HLogEdit(key.getColumn(), es.getValue(),
> key.getTimestamp());
> + doWrite(logKey, logEdit, sync);
> +
> + this.numEntries++;
> + }
> + updateLock.notifyAll();
> + }
> + if (this.numEntries > this.maxlogentries) {
> + requestLogRoll();
> + }
> + }
> +
> + public void sync() throws IOException {
> + lastLogFlushTime = System.currentTimeMillis();
> + this.writer.sync();
> + unflushedEntries = 0;
> + }
> +
> + void optionalSync() {
> + if (!this.closed) {
> + synchronized (updateLock) {
> + if (((System.currentTimeMillis() - this.optionalFlushInterval) >
> + this.lastLogFlushTime) && this.unflushedEntries > 0) {
> + try {
> + sync();
> + } catch (IOException e) {
> + LOG.error("Error flushing HLog", e);
> + }
> + }
> + }
> + }
> + }
> +
> + private void requestLogRoll() {
> + if (this.listener != null) {
> + this.listener.logRollRequested();
> + }
> + }
> +
> + private void doWrite(HLogKey logKey, HLogEdit logEdit, boolean sync)
> + throws IOException {
> + try {
> + this.writer.append(logKey, logEdit);
> + if (sync || ++unflushedEntries >= flushlogentries) {
> + sync();
> + }
> + } catch (IOException e) {
> + LOG.fatal("Could not append. Requesting close of log", e);
> + requestLogRoll();
> + throw e;
> + }
> + }
> +
> + /** Append an entry without a row to the log.
> + *
> + * @param regionInfo
> + * @param logEdit
> + * @throws IOException
> + */
> + public void append(HRegionInfo regionInfo, HLogEdit logEdit) throws
> IOException {
> + this.append(regionInfo, new byte[0], logEdit);
> + }
> +
> + /** Append an entry to the log.
> + *
> + * @param regionInfo
> + * @param row
> + * @param logEdit
> + * @throws IOException
> + */
> + public void append(HRegionInfo regionInfo, byte [] row, HLogEdit
> logEdit)
> + throws IOException {
> + if (closed) {
> + throw new IOException("Cannot append; log is closed");
> + }
> + byte [] regionName = regionInfo.getRegionName();
> + byte [] tableName = regionInfo.getTableDesc().getName();
> +
> + synchronized (updateLock) {
> + long seqNum = obtainSeqNum();
> + // The 'lastSeqWritten' map holds the sequence number of the oldest
> + // write for each region. When the cache is flushed, the entry for
> the
> + // region being flushed is removed if the sequence number of the
> flush
> + // is greater than or equal to the value in lastSeqWritten.
> + if (!this.lastSeqWritten.containsKey(regionName)) {
> + this.lastSeqWritten.put(regionName, Long.valueOf(seqNum));
> + }
> +
> + HLogKey logKey = new HLogKey(regionName, tableName, row, seqNum);
> + boolean sync = regionInfo.isMetaRegion() ||
> regionInfo.isRootRegion();
> + doWrite(logKey, logEdit, sync);
> + this.numEntries++;
> + updateLock.notifyAll();
> + }
> +
> + if (this.numEntries > this.maxlogentries) {
> + if (listener != null) {
> + listener.logRollRequested();
> + }
> + }
> + }
> +
> + /** @return How many items have been added to the log */
> + int getNumEntries() {
> + return numEntries;
> + }
> +
> + /**
> + * Obtain a log sequence number.
> + */
> + private long obtainSeqNum() {
> + long value;
> + synchronized (sequenceLock) {
> + value = logSeqNum++;
> + }
> + return value;
> + }
> +
> + /** @return the number of log files in use */
> + int getNumLogFiles() {
> + return outputfiles.size();
> + }
> +
> + /**
> + * Obtain a specified number of sequence numbers
> + *
> + * @param num number of sequence numbers to obtain
> + * @return array of sequence numbers
> + */
> + private long[] obtainSeqNum(int num) {
> + long[] results = new long[num];
> + synchronized (this.sequenceLock) {
> + for (int i = 0; i < num; i++) {
> + results[i] = this.logSeqNum++;
> + }
> + }
> + return results;
> + }
> +
> + /**
> + * By acquiring a log sequence ID, we can allow log messages to
> continue while
> + * we flush the cache.
> + *
> + * Acquire a lock so that we do not roll the log between the start and
> + * completion of a cache-flush. Otherwise the log-seq-id for the flush
> will
> + * not appear in the correct logfile.
> + *
> + * @return sequence ID to pass {@link #completeCacheFlush(Text, Text,
> long)}
> + * @see #completeCacheFlush(Text, Text, long)
> + * @see #abortCacheFlush()
> + */
> + long startCacheFlush() {
> + this.cacheFlushLock.lock();
> + return obtainSeqNum();
> + }
> +
> + /**
> + * Complete the cache flush
> + *
> + * Protected by cacheFlushLock
> + *
> + * @param regionName
> + * @param tableName
> + * @param logSeqId
> + * @throws IOException
> + */
> + void completeCacheFlush(final byte [] regionName, final byte []
> tableName,
> + final long logSeqId) throws IOException {
> +
> + try {
> + if (this.closed) {
> + return;
> + }
> + synchronized (updateLock) {
> + this.writer.append(new HLogKey(regionName, tableName,
> HLog.METAROW, logSeqId),
> + new HLogEdit(HLog.METACOLUMN,
> HLogEdit.completeCacheFlush.get(),
> + System.currentTimeMillis()));
> + this.numEntries++;
> + Long seq = this.lastSeqWritten.get(regionName);
> + if (seq != null && logSeqId >= seq.longValue()) {
> + this.lastSeqWritten.remove(regionName);
> + }
> + updateLock.notifyAll();
> + }
> + } finally {
> + this.cacheFlushLock.unlock();
> + }
> + }
> +
> + /**
> + * Abort a cache flush.
> + * Call if the flush fails. Note that the only recovery for an aborted
> flush
> + * currently is a restart of the regionserver so the snapshot content
> dropped
> + * by the failure gets restored to the memcache.
> + */
> + void abortCacheFlush() {
> + this.cacheFlushLock.unlock();
> + }
> +
> + /**
> + * @param column
> + * @return true if the column is a meta column
> + */
> + public static boolean isMetaColumn(byte [] column) {
> + return Bytes.equals(METACOLUMN, column);
> + }
> +
> + /**
> + * Split up a bunch of regionserver commit log files that are no longer
> + * being written to, into new files, one per region for region to
> replay on
> + * startup. Delete the old log files when finished.
> + *
> + * @param rootDir qualified root directory of the HBase instance
> + * @param srcDir Directory of log files to split: e.g.
> + * <code>${ROOTDIR}/log_HOST_PORT</code>
> + * @param fs FileSystem
> + * @param conf HBaseConfiguration
> + * @throws IOException
> + */
> + public static void splitLog(final Path rootDir, final Path srcDir,
> + final FileSystem fs, final Configuration conf)
> + throws IOException {
> + if (!fs.exists(srcDir)) {
> + // Nothing to do
> + return;
> + }
> + FileStatus [] logfiles = fs.listStatus(srcDir);
> + if (logfiles == null || logfiles.length == 0) {
> + // Nothing to do
> + return;
> + }
> + LOG.info("Splitting " + logfiles.length + " log(s) in " +
> + srcDir.toString());
> + splitLog(rootDir, logfiles, fs, conf);
> + try {
> + fs.delete(srcDir, true);
> + } catch (IOException e) {
> + e = RemoteExceptionHandler.checkIOException(e);
> + IOException io = new IOException("Cannot delete: " + srcDir);
> + io.initCause(e);
> + throw io;
> + }
> + LOG.info("log file splitting completed for " + srcDir.toString());
> + }
> +
> + /*
> + * @param rootDir
> + * @param logfiles
> + * @param fs
> + * @param conf
> + * @throws IOException
> + */
> + private static void splitLog(final Path rootDir, final FileStatus []
> logfiles,
> + final FileSystem fs, final Configuration conf)
> + throws IOException {
> + Map<byte [], SequenceFile.Writer> logWriters =
> + new TreeMap<byte [], SequenceFile.Writer>(Bytes.BYTES_COMPARATOR);
> + try {
> + for (int i = 0; i < logfiles.length; i++) {
> + if (LOG.isDebugEnabled()) {
> + LOG.debug("Splitting " + (i + 1) + " of " + logfiles.length +
> ": " +
> + logfiles[i].getPath());
> + }
> + // Check for possibly empty file. With appends, currently Hadoop
> reports
> + // a zero length even if the file has been sync'd. Revisit if
> + // HADOOP-4751 is committed.
> + boolean possiblyEmpty = logfiles[i].getLen() <= 0;
> + HLogKey key = new HLogKey();
> + HLogEdit val = new HLogEdit();
> + try {
> + SequenceFile.Reader in =
> + new SequenceFile.Reader(fs, logfiles[i].getPath(), conf);
> + try {
> + int count = 0;
> + for (; in.next(key, val); count++) {
> + byte [] tableName = key.getTablename();
> + byte [] regionName = key.getRegionName();
> + SequenceFile.Writer w = logWriters.get(regionName);
> + if (w == null) {
> + Path logfile = new Path(
> + HRegion.getRegionDir(
> + HTableDescriptor.getTableDir(rootDir, tableName),
> + HRegionInfo.encodeRegionName(regionName)),
> + HREGION_OLDLOGFILE_NAME);
> + Path oldlogfile = null;
> + SequenceFile.Reader old = null;
> + if (fs.exists(logfile)) {
> + LOG.warn("Old log file " + logfile +
> + " already exists. Copying existing file to new file");
> + oldlogfile = new Path(logfile.toString() + ".old");
> + fs.rename(logfile, oldlogfile);
> + old = new SequenceFile.Reader(fs, oldlogfile, conf);
> + }
> + w = SequenceFile.createWriter(fs, conf, logfile,
> HLogKey.class,
> + HLogEdit.class, getCompressionType(conf));
> + // Use copy of regionName; regionName object is reused
> inside in
> + // HStoreKey.getRegionName so its content changes as we
> iterate.
> + logWriters.put(regionName, w);
> + if (LOG.isDebugEnabled()) {
> + LOG.debug("Creating new log file writer for path " +
> logfile +
> + " and region " + Bytes.toString(regionName));
> + }
> +
> + if (old != null) {
> + // Copy from existing log file
> + HLogKey oldkey = new HLogKey();
> + HLogEdit oldval = new HLogEdit();
> + for (; old.next(oldkey, oldval); count++) {
> + if (LOG.isDebugEnabled() && count > 0 && count %
> 10000 == 0) {
> + LOG.debug("Copied " + count + " edits");
> + }
> + w.append(oldkey, oldval);
> + }
> + old.close();
> + fs.delete(oldlogfile, true);
> + }
> + }
> + w.append(key, val);
> + }
> + if (LOG.isDebugEnabled()) {
> + LOG.debug("Applied " + count + " total edits from " +
> + logfiles[i].getPath().toString());
> + }
> + } catch (IOException e) {
> + e = RemoteExceptionHandler.checkIOException(e);
> + if (!(e instanceof EOFException)) {
> + LOG.warn("Exception processing " + logfiles[i].getPath() +
> + " -- continuing. Possible DATA LOSS!", e);
> + }
> + } finally {
> + try {
> + in.close();
> + } catch (IOException e) {
> + LOG.warn("Close in finally threw exception -- continuing",
> e);
> + }
> + // Delete the input file now so we do not replay edits. We
> could
> + // have gotten here because of an exception. If so, probably
> + // nothing we can do about it. Replaying it, it could work
> but we
> + // could be stuck replaying for ever. Just continue though we
> + // could have lost some edits.
> + fs.delete(logfiles[i].getPath(), true);
> + }
> + } catch (IOException e) {
> + if (possiblyEmpty) {
> + continue;
> + }
> + throw e;
> + }
> + }
> + } finally {
> + for (SequenceFile.Writer w : logWriters.values()) {
> + w.close();
> + }
> + }
> + }
> +
> + /**
> + * Construct the HLog directory name
> + *
> + * @param info HServerInfo for server
> + * @return the HLog directory name
> + */
> + public static String getHLogDirectoryName(HServerInfo info) {
> + StringBuilder dirName = new StringBuilder("log_");
> + try {
> + dirName.append(URLEncoder.encode(
> + info.getServerAddress().getBindAddress(), UTF8_ENCODING));
> + } catch (UnsupportedEncodingException e) {
> + LOG.error("Error encoding '" +
> info.getServerAddress().getBindAddress()
> + + "'", e);
> + }
> + dirName.append("_");
> + dirName.append(info.getStartCode());
> + dirName.append("_");
> + dirName.append(info.getServerAddress().getPort());
> + return dirName.toString();
> + }
> +
> + private static void usage() {
> + System.err.println("Usage: java org.apache.hbase.HLog" +
> + " {--dump <logfile>... | --split <logdir>...}");
> + }
> +
> + /**
> + * Pass one or more log file names and it will either dump out a text
> version
> + * on <code>stdout</code> or split the specified log files.
> + *
> + * @param args
> + * @throws IOException
> + */
> + public static void main(String[] args) throws IOException {
> + if (args.length < 2) {
> + usage();
> + System.exit(-1);
> + }
> + boolean dump = true;
> + if (args[0].compareTo("--dump") != 0) {
> + if (args[0].compareTo("--split") == 0) {
> + dump = false;
> +
> + } else {
> + usage();
> + System.exit(-1);
> + }
> + }
> + Configuration conf = new HBaseConfiguration();
> + FileSystem fs = FileSystem.get(conf);
> + Path baseDir = new Path(conf.get(HBASE_DIR));
> +
> + for (int i = 1; i < args.length; i++) {
> + Path logPath = new Path(args[i]);
> + if (!fs.exists(logPath)) {
> + throw new FileNotFoundException(args[i] + " does not exist");
> + }
> + if (dump) {
> + if (!fs.isFile(logPath)) {
> + throw new IOException(args[i] + " is not a file");
> + }
> + Reader log = new SequenceFile.Reader(fs, logPath, conf);
> + try {
> + HLogKey key = new HLogKey();
> + HLogEdit val = new HLogEdit();
> + while (log.next(key, val)) {
> + System.out.println(key.toString() + " " + val.toString());
> + }
> + } finally {
> + log.close();
> + }
> + } else {
> + if (!fs.getFileStatus(logPath).isDir()) {
> + throw new IOException(args[i] + " is not a directory");
> + }
> + splitLog(baseDir, logPath, fs, conf);
> + }
> + }
> + }
> +}
>
>