You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2010/07/17 00:28:21 UTC
svn commit: r964965 - in /hbase/trunk: ./
src/main/java/org/apache/hadoop/hbase/regionserver/
src/main/java/org/apache/hadoop/hbase/regionserver/wal/
src/test/java/org/apache/hadoop/hbase/regionserver/wal/
Author: stack
Date: Fri Jul 16 22:28:21 2010
New Revision: 964965
URL: http://svn.apache.org/viewvc?rev=964965&view=rev
Log:
HBASE-2727 Splits writing one file only is untenable; need dir of recovered edits ordered by sequenceid
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=964965&r1=964964&r2=964965&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Fri Jul 16 22:28:21 2010
@@ -435,6 +435,8 @@ Release 0.21.0 - Unreleased
(Nicolas Spiegelberg via Stack)
HBASE-2781 ZKW.createUnassignedRegion doesn't make sure existing znode is
in the right state (Karthik Ranganathan via JD)
+ HBASE-2727 Splits writing one file only is untenable; need dir of recovered
+ edits ordered by sequenceid
IMPROVEMENTS
HBASE-1760 Cleanup TODOs in HTable
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=964965&r1=964964&r2=964965&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Fri Jul 16 22:28:21 2010
@@ -19,6 +19,27 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Constructor;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -64,28 +85,6 @@ import org.apache.hadoop.util.StringUtil
import com.google.common.collect.Lists;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.lang.reflect.Constructor;
-import java.util.AbstractList;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableSet;
-import java.util.Random;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
/**
* HRegion stores data for a certain region of a table. It stores all columns
* for each row. A given table consists of one or more HRegions.
@@ -126,6 +125,7 @@ public class HRegion implements HeapSize
public static final Log LOG = LogFactory.getLog(HRegion.class);
static final String SPLITDIR = "splits";
static final String MERGEDIR = "merges";
+
final AtomicBoolean closed = new AtomicBoolean(false);
/* Closing can take some time; use the closing flag if there is stuff we don't
* want to do while in closing state; e.g. like offer this region up to the
@@ -330,9 +330,8 @@ public class HRegion implements HeapSize
// Remove temporary data left over from old regions
cleanupTmpDir();
- // Load in all the HStores. Get min and max seqids across all families.
+ // Load in all the HStores. Get maximum seqid.
long maxSeqId = -1;
- long minSeqId = Integer.MAX_VALUE;
for (HColumnDescriptor c : this.regionInfo.getTableDesc().getFamilies()) {
Store store = instantiateHStore(this.tableDir, c);
this.stores.put(c.getName(), store);
@@ -340,12 +339,9 @@ public class HRegion implements HeapSize
if (storeSeqId > maxSeqId) {
maxSeqId = storeSeqId;
}
- if (minSeqId > storeSeqId) {
- minSeqId = storeSeqId;
- }
}
// Recover any edits if available.
- long seqid = replayRecoveredEditsIfAny(this.regiondir, minSeqId, reporter);
+ maxSeqId = replayRecoveredEditsIfAny(this.regiondir, maxSeqId, reporter);
// Get rid of any splits or merges that were lost in-progress. Clean out
// these directories here on open. We may be opening a region that was
@@ -362,7 +358,7 @@ public class HRegion implements HeapSize
this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
// Use maximum of log sequenceid or that which was found in stores
// (particularly if no recovered edits, seqid will be -1).
- long nextSeqid = Math.max(seqid, maxSeqId) + 1;
+ long nextSeqid = maxSeqId + 1;
LOG.info("Onlined " + this.toString() + "; next sequenceid=" + nextSeqid);
return nextSeqid;
}
@@ -902,7 +898,9 @@ public class HRegion implements HeapSize
}
/**
- * Flushing the cache is a little tricky. We have a lot of updates in the
+ * Flush the memstore.
+ *
+ * Flushing the memstore is a little tricky. We have a lot of updates in the
* memstore, all of which have also been written to the log. We need to
* write those updates in the memstore out to disk, while being able to
* process reads/writes as much as possible during the flush operation. Also,
@@ -934,6 +932,19 @@ public class HRegion implements HeapSize
* because a Snapshot was not properly persisted.
*/
protected boolean internalFlushcache() throws IOException {
+ return internalFlushcache(this.log, -1);
+ }
+
+ /**
+ * @param wal Null if we're NOT to go via hlog/wal.
+ * @param myseqid The seqid to use if <code>wal</code> is null writing out
+ * flush file.
+ * @return true if the region needs compacting
+ * @throws IOException
+ * @see {@link #internalFlushcache()}
+ */
+ protected boolean internalFlushcache(final HLog wal, final long myseqid)
+ throws IOException {
final long startTime = EnvironmentEdgeManager.currentTimeMillis();
// Clear flush flag.
// Record latest flush time
@@ -945,7 +956,8 @@ public class HRegion implements HeapSize
if (LOG.isDebugEnabled()) {
LOG.debug("Started memstore flush for region " + this +
". Current region memstore size " +
- StringUtils.humanReadableInt(this.memstoreSize.get()));
+ StringUtils.humanReadableInt(this.memstoreSize.get()) +
+ ((wal != null)? "": "; wal is null, using passed myseqid=" + myseqid));
}
// Stop updates while we snapshot the memstore of all stores. We only have
@@ -958,14 +970,14 @@ public class HRegion implements HeapSize
long sequenceId = -1L;
long completeSequenceId = -1L;
- // we have to take a write lock during snapshot, or else a write could
+ // We have to take a write lock during snapshot, or else a write could
// end up in both snapshot and memstore (makes it difficult to do atomic
// rows then)
this.updatesLock.writeLock().lock();
final long currentMemStoreSize = this.memstoreSize.get();
List<StoreFlusher> storeFlushers = new ArrayList<StoreFlusher>(stores.size());
try {
- sequenceId = log.startCacheFlush();
+ sequenceId = (wal == null)? myseqid: wal.startCacheFlush();
completeSequenceId = this.getCompleteCacheFlushSequenceId(sequenceId);
for (Store s : stores.values()) {
@@ -1009,9 +1021,9 @@ public class HRegion implements HeapSize
}
try {
- if (atomicWork != null) {
- atomicWork.call();
- }
+ if (atomicWork != null) {
+ atomicWork.call();
+ }
// Switch snapshot (in memstore) -> new hfile (thus causing
// all the store scanners to reset/reseek).
@@ -1038,7 +1050,7 @@ public class HRegion implements HeapSize
// We used to only catch IOEs but its possible that we'd get other
// exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch
// all and sundry.
- this.log.abortCacheFlush();
+ if (wal != null) wal.abortCacheFlush();
DroppedSnapshotException dse = new DroppedSnapshotException("region: " +
Bytes.toStringBinary(getRegionName()));
dse.initCause(t);
@@ -1052,9 +1064,11 @@ public class HRegion implements HeapSize
// This tells future readers that the HStores were emitted correctly,
// and that all updates to the log for this regionName that have lower
// log-sequence-ids can be safely ignored.
- this.log.completeCacheFlush(getRegionName(),
+ if (wal != null) {
+ wal.completeCacheFlush(getRegionName(),
regionInfo.getTableDesc().getName(), completeSequenceId,
this.getRegionInfo().isMetaRegion());
+ }
// C. Finally notify anyone waiting on memstore to clear:
// e.g. checkResources().
@@ -1067,12 +1081,12 @@ public class HRegion implements HeapSize
LOG.info("Finished memstore flush of ~" +
StringUtils.humanReadableInt(currentMemStoreSize) + " for region " +
this + " in " + (now - startTime) + "ms, sequence id=" + sequenceId +
- ", compaction requested=" + compactionRequested);
+ ", compaction requested=" + compactionRequested +
+ ((wal == null)? "; wal=null": ""));
}
return compactionRequested;
}
-
/**
* A hook for sub classed wishing to perform operations prior to the cache
* flush commit stage.
@@ -1853,46 +1867,75 @@ public class HRegion implements HeapSize
* Read the edits log put under this region by wal log splitting process. Put
* the recovered edits back up into this region.
*
- * We can ignore any log message that has a sequence ID that's equal to or
+ * <p>We can ignore any log message that has a sequence ID that's equal to or
* lower than minSeqId. (Because we know such log messages are already
* reflected in the HFiles.)
+ *
+ * <p>While this is running we are putting pressure on memory yet we are
+ * outside of our usual accounting because we are not yet an onlined region
+ * (this stuff is being run as part of Region initialization). This means
+ * that if we're up against global memory limits, we'll not be flagged to flush
+ * because we are not online. We can't be flushed by usual mechanisms anyways;
+ * we're not yet online so our relative sequenceids are not yet aligned with
+ * HLog sequenceids -- not till we come up online, post processing of split
+ * edits.
+ *
+ * <p>But to help relieve memory pressure, at least manage our own heap size
+ * flushing if are in excess of per-region limits. Flushing, though, we have
+ * to be careful and avoid using the regionserver/hlog sequenceid. Its running
+ * on a different line to whats going on in here in this region context so if we
+ * crashed replaying these edits, but in the midst had a flush that used the
+ * regionserver log with a sequenceid in excess of whats going on in here
+ * in this region and with its split editlogs, then we could miss edits the
+ * next time we go to recover. So, we have to flush inline, using seqids that
+ * make sense in a this single region context only -- until we online.
+ *
* @param regiondir
- * @param minSeqId Minimum sequenceid found in a store file. Edits in log
- * must be larger than this to be replayed.
+ * @param minSeqId Any edit found in split editlogs needs to be in excess of
+ * this minSeqId to be applied, else its skipped.
* @param reporter
* @return the sequence id of the last edit added to this region out of the
- * recovered edits log, or -1 if no log recovered
+ * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
* @throws UnsupportedEncodingException
* @throws IOException
*/
protected long replayRecoveredEditsIfAny(final Path regiondir,
final long minSeqId, final Progressable reporter)
throws UnsupportedEncodingException, IOException {
- Path edits = new Path(regiondir, HLog.RECOVERED_EDITS);
- if (edits == null || !this.fs.exists(edits)) return -1;
- if (isZeroLengthThenDelete(this.fs, edits)) return -1;
- long maxSeqIdInLog = -1;
- try {
- maxSeqIdInLog = replayRecoveredEdits(edits, minSeqId, reporter);
- LOG.debug("Deleting recovered edits file: " + edits);
- if (!this.fs.delete(edits, false)) {
- LOG.error("Failed delete of " + edits);
- }
- } catch (IOException e) {
- boolean skipErrors = conf.getBoolean("hbase.skip.errors", false);
- if (skipErrors) {
- Path moveAsideName = new Path(edits.getParent(), edits.getName() + "." +
- System.currentTimeMillis());
- LOG.error("hbase.skip.errors=true so continuing. Renamed " + edits +
- " as " + moveAsideName, e);
- if (!this.fs.rename(edits, moveAsideName)) {
- LOG.error("hbase.skip.errors=true so continuing. Rename failed");
+ long seqid = minSeqId;
+ NavigableSet<Path> files = HLog.getSplitEditFilesSorted(this.fs, regiondir);
+ if (files == null || files.isEmpty()) return seqid;
+ for (Path edits: files) {
+ if (edits == null || !this.fs.exists(edits)) {
+ LOG.warn("Null or non-existent edits file: " + edits);
+ continue;
+ }
+ if (isZeroLengthThenDelete(this.fs, edits)) continue;
+ try {
+ seqid = replayRecoveredEdits(edits, seqid, reporter);
+ } catch (IOException e) {
+ boolean skipErrors = conf.getBoolean("hbase.skip.errors", false);
+ if (skipErrors) {
+ Path p = HLog.moveAsideBadEditsFile(fs, edits);
+ LOG.error("hbase.skip.errors=true so continuing. Renamed " + edits +
+ " as " + p, e);
+ } else {
+ throw e;
+ }
+ }
+ }
+ if (seqid > minSeqId) {
+ // Then we added some edits to memory. Flush and cleanup split edit files.
+ internalFlushcache(null, seqid);
+ for (Path file: files) {
+ if (!this.fs.delete(file, false)) {
+ LOG.error("Failed delete of " + file);
+ } else {
+ LOG.debug("Deleted recovered.edits file=" + file);
}
- } else {
- throw e;
}
}
- return maxSeqIdInLog;
+ return seqid;
}
/*
@@ -1901,12 +1944,13 @@ public class HRegion implements HeapSize
* must be larger than this to be replayed.
* @param reporter
* @return the sequence id of the last edit added to this region out of the
- * recovered edits log, or -1 if no log recovered
+ * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
* @throws IOException
*/
private long replayRecoveredEdits(final Path edits,
final long minSeqId, final Progressable reporter)
throws IOException {
+ LOG.info("Replaying edits from " + edits + "; minSeqId=" + minSeqId);
HLog.Reader reader = HLog.getReader(this.fs, edits, conf);
try {
return replayRecoveredEdits(reader, minSeqId, reporter);
@@ -1916,26 +1960,22 @@ public class HRegion implements HeapSize
}
/* @param reader Reader against file of recovered edits.
- * @param minSeqId Minimum sequenceid found in a store file. Edits in log
- * must be larger than this to be replayed.
+ * @param minSeqId Any edit found in split editlogs needs to be in excess of
+ * this minSeqId to be applied, else its skipped.
* @param reporter
* @return the sequence id of the last edit added to this region out of the
- * recovered edits log, or -1 if no log recovered
+ * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
* @throws IOException
*/
private long replayRecoveredEdits(final HLog.Reader reader,
final long minSeqId, final Progressable reporter)
throws IOException {
- long currentEditSeqId = -1;
+ long currentEditSeqId = minSeqId;
long firstSeqIdInLog = -1;
long skippedEdits = 0;
long editsCount = 0;
HLog.Entry entry;
Store store = null;
- // Get map of family name to maximum sequence id. Do it here up front
- // because as we progress, the sequence id can change if we happen to flush
- // The flush ups the seqid for the Store. The new seqid can cause us skip edits.
- Map<byte [], Long> familyToOriginalMaxSeqId = familyToMaxSeqId(this.stores);
// How many edits to apply before we send a progress report.
int interval = this.conf.getInt("hbase.hstore.report.interval.edits", 2000);
while ((entry = reader.next()) != null) {
@@ -1945,12 +1985,13 @@ public class HRegion implements HeapSize
firstSeqIdInLog = key.getLogSeqNum();
}
// Now, figure if we should skip this edit.
- currentEditSeqId = Math.max(currentEditSeqId, key.getLogSeqNum());
- if (key.getLogSeqNum() <= minSeqId) {
+ if (key.getLogSeqNum() <= currentEditSeqId) {
skippedEdits++;
continue;
}
- for (KeyValue kv : val.getKeyValues()) {
+ currentEditSeqId = key.getLogSeqNum();
+ boolean flush = false;
+ for (KeyValue kv: val.getKeyValues()) {
// Check this edit is for me. Also, guard against writing the special
// METACOLUMN info such as HBASE::CACHEFLUSH entries
if (kv.matchingFamily(HLog.METAFAMILY) ||
@@ -1969,16 +2010,13 @@ public class HRegion implements HeapSize
skippedEdits++;
continue;
}
- // The edits' id has to be in excess of the original max seqid of the
- // targeted store.
- long storeMaxSeqId = familyToOriginalMaxSeqId.get(store.getFamily().getName());
- if (currentEditSeqId < storeMaxSeqId) {
- skippedEdits++;
- continue;
- }
- restoreEdit(kv);
+ // Once we are over the limit, restoreEdit will keep returning true to
+ // flush -- but don't flush until we've played all the kvs that make up
+ // the WALEdit.
+ flush = restoreEdit(store, kv);
editsCount++;
}
+ if (flush) internalFlushcache(null, currentEditSeqId);
// Every 'interval' edits, tell the reporter we're making progress.
// Have seen 60k edits taking 3minutes to complete.
@@ -1994,34 +2032,14 @@ public class HRegion implements HeapSize
return currentEditSeqId;
}
- /*
- * @param stores
- * @return Map of family name to maximum sequenceid.
- */
- private Map<byte [], Long> familyToMaxSeqId(final Map<byte [], Store> stores) {
- Map<byte [], Long> map = new TreeMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
- for (Map.Entry<byte [], Store> e: stores.entrySet()) {
- map.put(e.getKey(), e.getValue().getMaxSequenceId());
- }
- return map;
- }
-
- /*
- * @param kv Apply this value to this region.
- * @throws IOException
+ /**
+ * Used by tests
+ * @param s Store to add edit too.
+ * @param kv KeyValue to add.
+ * @return True if we should flush.
*/
- // This method is protected so can be called from tests.
- protected void restoreEdit(final KeyValue kv) throws IOException {
- // This is really expensive to do per edit. Loads of object creation.
- // TODO: Optimization. Apply edits batched by family.
- Map<byte [], List<KeyValue>> map =
- new TreeMap<byte [], List<KeyValue>>(Bytes.BYTES_COMPARATOR);
- map.put(kv.getFamily(), Collections.singletonList(kv));
- if (kv.isDelete()) {
- delete(map, true);
- } else {
- put(map, true);
- }
+ protected boolean restoreEdit(final Store s, final KeyValue kv) {
+ return isFlushSize(this.memstoreSize.addAndGet(s.add(kv)));
}
/*
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=964965&r1=964964&r2=964965&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Fri Jul 16 22:28:21 2010
@@ -36,7 +36,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.NavigableMap;
+import java.util.NavigableSet;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
@@ -53,6 +53,7 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
@@ -61,6 +62,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
@@ -119,9 +121,17 @@ import com.google.common.util.concurrent
*/
public class HLog implements Syncable {
static final Log LOG = LogFactory.getLog(HLog.class);
- private static final String HLOG_DATFILE = "hlog.dat.";
public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY");
static final byte [] METAROW = Bytes.toBytes("METAROW");
+
+ /*
+ * Name of directory that holds recovered edits written by the wal log
+ * splitting code, one per region
+ */
+ private static final String RECOVERED_EDITS_DIR = "recovered.edits";
+ private static final Pattern EDITFILES_NAME_PATTERN =
+ Pattern.compile("-?[0-9]+");
+
private final FileSystem fs;
private final Path dir;
private final Configuration conf;
@@ -144,11 +154,6 @@ public class HLog implements Syncable {
private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas
final static Object [] NO_ARGS = new Object []{};
- /** Name of file that holds recovered edits written by the wal log splitting
- * code, one per region
- */
- public static final String RECOVERED_EDITS = "recovered.edits";
-
// used to indirectly tell syncFs to force the sync
private boolean forceSync = false;
@@ -1459,7 +1464,7 @@ public class HLog implements Syncable {
NamingThreadFactory f = new NamingThreadFactory(
"SplitWriter-%1$d", Executors.defaultThreadFactory());
ThreadPoolExecutor threadPool = (ThreadPoolExecutor)Executors.newFixedThreadPool(logWriterThreads, f);
- for (final byte[] region : splitLogsMap.keySet()) {
+ for (final byte [] region : splitLogsMap.keySet()) {
Callable splitter = createNewSplitter(rootDir, logWriters, splitLogsMap, region, fs, conf);
writeFutureResult.put(region, threadPool.submit(splitter));
}
@@ -1579,17 +1584,19 @@ public class HLog implements Syncable {
WriterAndPath wap = logWriters.get(region);
for (Entry logEntry: entries) {
if (wap == null) {
- Path logFile = getRegionLogPath(logEntry, rootDir);
- if (fs.exists(logFile)) {
- LOG.warn("Found existing old hlog file. It could be the result of a previous" +
- "failed split attempt. Deleting " + logFile +
- ", length=" + fs.getFileStatus(logFile).getLen());
- fs.delete(logFile, false);
+ Path regionedits = getRegionSplitEditsPath(fs, logEntry, rootDir);
+ if (fs.exists(regionedits)) {
+ LOG.warn("Found existing old edits file. It could be the " +
+ "result of a previous failed split attempt. Deleting " +
+ regionedits + ", length=" + fs.getFileStatus(regionedits).getLen());
+ if (!fs.delete(regionedits, false)) {
+ LOG.warn("Failed delete of old " + regionedits);
+ }
}
- Writer w = createWriter(fs, logFile, conf);
- wap = new WriterAndPath(logFile, w);
+ Writer w = createWriter(fs, regionedits, conf);
+ wap = new WriterAndPath(regionedits, w);
logWriters.put(region, wap);
- LOG.debug("Creating writer path=" + logFile +
+ LOG.debug("Creating writer path=" + regionedits +
" region=" + Bytes.toStringBinary(region));
}
wap.w.append(logEntry);
@@ -1643,14 +1650,101 @@ public class HLog implements Syncable {
}
}
- private static Path getRegionLogPath(Entry logEntry, Path rootDir) {
- Path tableDir =
- HTableDescriptor.getTableDir(rootDir, logEntry.getKey().getTablename());
- Path regionDir =
- HRegion.getRegionDir(tableDir, HRegionInfo.encodeRegionName(logEntry.getKey().getRegionName()));
- return new Path(regionDir, RECOVERED_EDITS);
+ /*
+ * Path to a file under RECOVERED_EDITS_DIR directory of the region found in
+ * <code>logEntry</code> named for the sequenceid in the passed
+ * <code>logEntry</code>: e.g. /hbase/some_table/2323432434/recovered.edits/2332.
+ * This method also ensures existence of RECOVERED_EDITS_DIR under the region
+ * creating it if necessary.
+ * @param fs
+ * @param logEntry
+ * @param rootDir HBase root dir.
+ * @return Path to file into which to dump split log edits.
+ * @throws IOException
+ */
+ private static Path getRegionSplitEditsPath(final FileSystem fs,
+ final Entry logEntry, final Path rootDir)
+ throws IOException {
+ Path tableDir = HTableDescriptor.getTableDir(rootDir,
+ logEntry.getKey().getTablename());
+ Path regiondir = HRegion.getRegionDir(tableDir,
+ HRegionInfo.encodeRegionName(logEntry.getKey().getRegionName()));
+ Path dir = getRegionDirRecoveredEditsDir(regiondir);
+ if (!fs.exists(dir)) {
+ if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
+ }
+ return new Path(dir,
+ formatRecoveredEditsFileName(logEntry.getKey().getLogSeqNum()));
}
+ static String formatRecoveredEditsFileName(final long seqid) {
+ return String.format("%019d", seqid);
+ }
+
+
+ /**
+ * Returns sorted set of edit files made by wal-log splitter.
+ * @param fs
+ * @param regiondir
+ * @return Files in passed <code>regiondir</code> as a sorted set.
+ * @throws IOException
+ */
+ public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem fs,
+ final Path regiondir)
+ throws IOException {
+ Path editsdir = getRegionDirRecoveredEditsDir(regiondir);
+ FileStatus [] files = fs.listStatus(editsdir, new PathFilter () {
+ @Override
+ public boolean accept(Path p) {
+ boolean result = false;
+ try {
+ // Return files and only files that match the editfile names pattern.
+ // There can be other files in this directory other than edit files.
+ // In particular, on error, we'll move aside the bad edit file giving
+ // it a timestamp suffix. See moveAsideBadEditsFile.
+ Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName());
+ result = fs.isFile(p) && m.matches();
+ } catch (IOException e) {
+ LOG.warn("Failed isFile check on " + p);
+ }
+ return result;
+ }
+ });
+ NavigableSet<Path> filesSorted = new TreeSet<Path>();
+ if (files == null) return filesSorted;
+ for (FileStatus status: files) {
+ filesSorted.add(status.getPath());
+ }
+ return filesSorted;
+ }
+
+ /**
+ * Move aside a bad edits file.
+ * @param fs
+ * @param edits Edits file to move aside.
+ * @return The name of the moved aside file.
+ * @throws IOException
+ */
+ public static Path moveAsideBadEditsFile(final FileSystem fs,
+ final Path edits)
+ throws IOException {
+ Path moveAsideName = new Path(edits.getParent(), edits.getName() + "." +
+ System.currentTimeMillis());
+ if (!fs.rename(edits, moveAsideName)) {
+ LOG.warn("Rename failed from " + edits + " to " + moveAsideName);
+ }
+ return moveAsideName;
+ }
+
+ /**
+ * @param regiondir This regions directory in the filesystem.
+ * @return The directory that holds recovered edits files for the region
+ * <code>regiondir</code>
+ */
+ public static Path getRegionDirRecoveredEditsDir(final Path regiondir) {
+ return new Path(regiondir, RECOVERED_EDITS_DIR);
+ }
+
/**
*
* @param visitor
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java?rev=964965&r1=964964&r2=964965&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java Fri Jul 16 22:28:21 2010
@@ -476,9 +476,6 @@ public class TestHLogSplit {
assertEquals(0, compareHLogSplitDirs(firstSplitPath, splitPath));
}
-
-
-
/**
* This thread will keep writing to the file after the split process has started
* It simulates a region server that was considered dead but woke up and wrote
@@ -610,11 +607,14 @@ public class TestHLogSplit {
}
}
- private Path getLogForRegion(Path rootdir, byte[] table, String region) {
- return new Path(HRegion.getRegionDir(HTableDescriptor
- .getTableDir(rootdir, table),
- HRegionInfo.encodeRegionName(region.getBytes())),
- HLog.RECOVERED_EDITS);
+ private Path getLogForRegion(Path rootdir, byte[] table, String region)
+ throws IOException {
+ Path tdir = HTableDescriptor.getTableDir(rootdir, table);
+ Path editsdir = HLog.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir,
+ HRegionInfo.encodeRegionName(region.getBytes())));
+ FileStatus [] files = this.fs.listStatus(editsdir);
+ assertEquals(1, files.length);
+ return files[0].getPath();
}
private void corruptHLog(Path path, Corruptions corruption, boolean close,
@@ -722,8 +722,15 @@ public class TestHLogSplit {
FileStatus[] f2 = fs.listStatus(p2);
for (int i=0; i<f1.length; i++) {
- if (!logsAreEqual(new Path(f1[i].getPath(), HLog.RECOVERED_EDITS),
- new Path(f2[i].getPath(), HLog.RECOVERED_EDITS))) {
+ // Regions now have a directory named RECOVERED_EDITS_DIR and in here
+ // are split edit files. In below presume only 1.
+ Path rd1 = HLog.getRegionDirRecoveredEditsDir(f1[i].getPath());
+ FileStatus [] rd1fs = fs.listStatus(rd1);
+ assertEquals(1, rd1fs.length);
+ Path rd2 = HLog.getRegionDirRecoveredEditsDir(f2[i].getPath());
+ FileStatus [] rd2fs = fs.listStatus(rd2);
+ assertEquals(1, rd2fs.length);
+ if (!logsAreEqual(rd1fs[0].getPath(), rd2fs[0].getPath())) {
return -1;
}
}
@@ -745,6 +752,4 @@ public class TestHLogSplit {
}
return true;
}
-
-
}
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java?rev=964965&r1=964964&r2=964965&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java Fri Jul 16 22:28:21 2010
@@ -42,8 +42,8 @@ import org.apache.hadoop.hbase.client.Ge
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.regionserver.FlushRequester;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -115,6 +115,57 @@ public class TestWALReplay {
}
/**
+ * Tests for hbase-2727.
+ * @throws Exception
+ * @see https://issues.apache.org/jira/browse/HBASE-2727
+ */
+ @Test
+ public void test2727() throws Exception {
+ // Test being able to have > 1 set of edits in the recovered.edits directory.
+ // Ensure edits are replayed properly.
+ final String tableNameStr = "test2727";
+ HRegionInfo hri = createBasic3FamilyHRegionInfo(tableNameStr);
+ Path basedir = new Path(hbaseRootDir, tableNameStr);
+ deleteDir(basedir);
+
+ final byte [] tableName = Bytes.toBytes(tableNameStr);
+ final byte [] rowName = tableName;
+
+ HLog wal1 = createWAL(this.conf);
+ // Add 1k to each family.
+ final int countPerFamily = 1000;
+ for (HColumnDescriptor hcd: hri.getTableDesc().getFamilies()) {
+ addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee, wal1);
+ }
+ wal1.close();
+ runWALSplit(this.conf);
+
+ HLog wal2 = createWAL(this.conf);
+ // Up the sequenceid so that these edits are after the ones added above.
+ wal2.setSequenceNumber(wal1.getSequenceNumber());
+ // Add 1k to each family.
+ for (HColumnDescriptor hcd: hri.getTableDesc().getFamilies()) {
+ addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee, wal2);
+ }
+ wal2.close();
+ runWALSplit(this.conf);
+
+ HLog wal3 = createWAL(this.conf);
+ wal3.setSequenceNumber(wal2.getSequenceNumber());
+ try {
+ final HRegion region = new HRegion(basedir, wal3, this.fs, this.conf, hri,
+ null);
+ long seqid = region.initialize();
+ assertTrue(seqid > wal3.getSequenceNumber());
+
+ // TODO: Scan all.
+ region.close();
+ } finally {
+ wal3.closeAndDelete();
+ }
+ }
+
+ /**
* Test case of HRegion that is only made out of bulk loaded files. Assert
* that we don't 'crash'.
* @throws IOException
@@ -210,8 +261,8 @@ public class TestWALReplay {
HLog wal2 = createWAL(this.conf);
HRegion region2 = new HRegion(basedir, wal2, this.fs, this.conf, hri, null) {
@Override
- protected void restoreEdit(KeyValue kv) throws IOException {
- super.restoreEdit(kv);
+ protected boolean restoreEdit(Store s, KeyValue kv) {
+ super.restoreEdit(s, kv);
throw new RuntimeException("Called when it should not have been!");
}
};
@@ -221,7 +272,7 @@ public class TestWALReplay {
assertTrue(seqid + result.size() < seqid2);
// Next test. Add more edits, then 'crash' this region by stealing its wal
- // out from under it and assert that replay of the log addes the edits back
+ // out from under it and assert that replay of the log adds the edits back
// correctly when region is opened again.
for (HColumnDescriptor hcd: hri.getTableDesc().getFamilies()) {
addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region2, "y");
@@ -242,9 +293,10 @@ public class TestWALReplay {
final AtomicInteger countOfRestoredEdits = new AtomicInteger(0);
HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, hri, null) {
@Override
- protected void restoreEdit(KeyValue kv) throws IOException {
- super.restoreEdit(kv);
+ protected boolean restoreEdit(Store s, KeyValue kv) {
+ boolean b = super.restoreEdit(s, kv);
countOfRestoredEdits.incrementAndGet();
+ return b;
}
};
long seqid3 = region3.initialize();
@@ -317,14 +369,20 @@ public class TestWALReplay {
newConf.setInt("hbase.hregion.memstore.flush.size", 1024 * 100);
// Make a new wal for new region.
HLog newWal = createWAL(newConf);
+ final AtomicInteger flushcount = new AtomicInteger(0);
try {
- TestFlusher flusher = new TestFlusher();
final HRegion region = new HRegion(basedir, newWal, newFS, newConf, hri,
- flusher);
- flusher.r = region;
+ null) {
+ protected boolean internalFlushcache(HLog wal, long myseqid)
+ throws IOException {
+ boolean b = super.internalFlushcache(wal, myseqid);
+ flushcount.incrementAndGet();
+ return b;
+ };
+ };
long seqid = region.initialize();
- // Assert we flushed.
- assertTrue(flusher.count > 0);
+ // We flushed during init.
+ assertTrue(flushcount.get() > 0);
assertTrue(seqid > wal.getSequenceNumber());
Get get = new Get(rowName);
@@ -338,23 +396,6 @@ public class TestWALReplay {
}
}
- // Flusher used in this test. Keep count of how often we are called and
- // actually run the flush inside here.
- class TestFlusher implements FlushRequester {
- private int count = 0;
- private HRegion r;
-
- @Override
- public void request(HRegion region) {
- count++;
- try {
- r.flushcache();
- } catch (IOException e) {
- throw new RuntimeException("Exception flushing", e);
- }
- }
- }
-
private void addWALEdits (final byte [] tableName, final HRegionInfo hri,
final byte [] rowName, final byte [] family,
final int count, EnvironmentEdge ee, final HLog wal)