You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2010/08/27 07:01:07 UTC
svn commit: r990018 [3/10] - in /hbase/branches/0.90_master_rewrite: ./ bin/
bin/replication/ src/assembly/ src/docbkx/
src/main/java/org/apache/hadoop/hbase/
src/main/java/org/apache/hadoop/hbase/client/
src/main/java/org/apache/hadoop/hbase/filter/ s...
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Fri Aug 27 05:01:02 2010
@@ -26,7 +26,6 @@ import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -35,7 +34,6 @@ import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
-import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -68,7 +66,6 @@ import org.apache.hadoop.hbase.client.Sc
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
import org.apache.hadoop.hbase.io.HeapSize;
-import org.apache.hadoop.hbase.io.Reference.Range;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
@@ -124,8 +121,8 @@ import com.google.common.collect.Lists;
*/
public class HRegion implements HeapSize { // , Writable{
public static final Log LOG = LogFactory.getLog(HRegion.class);
- static final String SPLITDIR = "splits";
static final String MERGEDIR = "merges";
+
final AtomicBoolean closed = new AtomicBoolean(false);
/* Closing can take some time; use the closing flag if there is stuff we don't
* want to do while in closing state; e.g. like offer this region up to the
@@ -218,15 +215,12 @@ public class HRegion implements HeapSize
private final long blockingMemStoreSize;
final long threadWakeFrequency;
// Used to guard splits and closes
- private final ReentrantReadWriteLock splitsAndClosesLock =
- new ReentrantReadWriteLock();
- private final ReentrantReadWriteLock newScannerLock =
+ final ReentrantReadWriteLock lock =
new ReentrantReadWriteLock();
// Stop updates lock
private final ReentrantReadWriteLock updatesLock =
new ReentrantReadWriteLock();
- private final Object splitLock = new Object();
private boolean splitRequest;
private final ReadWriteConsistencyControl rwcc =
@@ -288,7 +282,7 @@ public class HRegion implements HeapSize
this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY,
10 * 1000);
String encodedNameStr = this.regionInfo.getEncodedName();
- this.regiondir = new Path(tableDir, encodedNameStr);
+ this.regiondir = getRegionDir(this.tableDir, encodedNameStr);
if (LOG.isDebugEnabled()) {
// Write out region name as string and its encoded name.
LOG.debug("Creating region " + this);
@@ -321,15 +315,18 @@ public class HRegion implements HeapSize
*/
public long initialize(final Progressable reporter)
throws IOException {
+ // A region can be reopened if failed a split; reset flags
+ this.closing.set(false);
+ this.closed.set(false);
+
// Write HRI to a file in case we need to recover .META.
checkRegioninfoOnFilesystem();
// Remove temporary data left over from old regions
cleanupTmpDir();
-
- // Load in all the HStores. Get min and max seqids across all families.
+
+ // Load in all the HStores. Get maximum seqid.
long maxSeqId = -1;
- long minSeqId = Integer.MAX_VALUE;
for (HColumnDescriptor c : this.regionInfo.getTableDesc().getFamilies()) {
Store store = instantiateHStore(this.tableDir, c);
this.stores.put(c.getName(), store);
@@ -337,17 +334,14 @@ public class HRegion implements HeapSize
if (storeSeqId > maxSeqId) {
maxSeqId = storeSeqId;
}
- if (minSeqId > storeSeqId) {
- minSeqId = storeSeqId;
- }
}
// Recover any edits if available.
- long seqid = replayRecoveredEditsIfAny(this.regiondir, minSeqId, reporter);
+ maxSeqId = replayRecoveredEditsIfAny(this.regiondir, maxSeqId, reporter);
// Get rid of any splits or merges that were lost in-progress. Clean out
// these directories here on open. We may be opening a region that was
// being split but we crashed in the middle of it all.
- FSUtils.deleteDirectory(this.fs, new Path(regiondir, SPLITDIR));
+ SplitTransaction.cleanupAnySplitDetritus(this);
FSUtils.deleteDirectory(this.fs, new Path(regiondir, MERGEDIR));
// See if region is meant to run read-only.
@@ -359,7 +353,7 @@ public class HRegion implements HeapSize
this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
// Use maximum of log sequenceid or that which was found in stores
// (particularly if no recovered edits, seqid will be -1).
- long nextSeqid = Math.max(seqid, maxSeqId) + 1;
+ long nextSeqid = maxSeqId + 1;
LOG.info("Onlined " + this.toString() + "; next sequenceid=" + nextSeqid);
return nextSeqid;
}
@@ -370,7 +364,7 @@ public class HRegion implements HeapSize
* @param initialFiles
* @throws IOException
*/
- private static void moveInitialFilesIntoPlace(final FileSystem fs,
+ static void moveInitialFilesIntoPlace(final FileSystem fs,
final Path initialFiles, final Path regiondir)
throws IOException {
if (initialFiles != null && fs.exists(initialFiles)) {
@@ -469,70 +463,61 @@ public class HRegion implements HeapSize
*
* @throws IOException e
*/
- public List<StoreFile> close(final boolean abort) throws IOException {
+ public List<StoreFile> close(final boolean abort)
+ throws IOException {
if (isClosed()) {
- LOG.warn("region " + this + " already closed");
+ LOG.warn("Region " + this + " already closed");
return null;
}
- synchronized (splitLock) {
- boolean wasFlushing = false;
- synchronized (writestate) {
- // Disable compacting and flushing by background threads for this
- // region.
- writestate.writesEnabled = false;
- wasFlushing = writestate.flushing;
- LOG.debug("Closing " + this + ": disabling compactions & flushes");
- while (writestate.compacting || writestate.flushing) {
- LOG.debug("waiting for" +
- (writestate.compacting ? " compaction" : "") +
- (writestate.flushing ?
- (writestate.compacting ? "," : "") + " cache flush" :
- "") + " to complete for region " + this);
- try {
- writestate.wait();
- } catch (InterruptedException iex) {
- // continue
- }
+ boolean wasFlushing = false;
+ synchronized (writestate) {
+ // Disable compacting and flushing by background threads for this
+ // region.
+ writestate.writesEnabled = false;
+ wasFlushing = writestate.flushing;
+ LOG.debug("Closing " + this + ": disabling compactions & flushes");
+ while (writestate.compacting || writestate.flushing) {
+ LOG.debug("waiting for" +
+ (writestate.compacting ? " compaction" : "") +
+ (writestate.flushing ?
+ (writestate.compacting ? "," : "") + " cache flush" :
+ "") + " to complete for region " + this);
+ try {
+ writestate.wait();
+ } catch (InterruptedException iex) {
+ // continue
}
}
- // If we were not just flushing, is it worth doing a preflush...one
- // that will clear out of the bulk of the memstore before we put up
- // the close flag?
- if (!abort && !wasFlushing && worthPreFlushing()) {
- LOG.info("Running close preflush of " + this.getRegionNameAsString());
+ }
+ // If we were not just flushing, is it worth doing a preflush...one
+ // that will clear out of the bulk of the memstore before we put up
+ // the close flag?
+ if (!abort && !wasFlushing && worthPreFlushing()) {
+ LOG.info("Running close preflush of " + this.getRegionNameAsString());
+ internalFlushcache();
+ }
+ this.closing.set(true);
+ lock.writeLock().lock();
+ try {
+ if (this.isClosed()) {
+ // SplitTransaction handles the null
+ return null;
+ }
+ LOG.debug("Updates disabled for region " + this);
+ // Don't flush the cache if we are aborting
+ if (!abort) {
internalFlushcache();
}
- newScannerLock.writeLock().lock();
- this.closing.set(true);
- try {
- splitsAndClosesLock.writeLock().lock();
- LOG.debug("Updates disabled for region, no outstanding scanners on " +
- this);
- try {
- // Write lock means no more row locks can be given out. Wait on
- // outstanding row locks to come in before we close so we do not drop
- // outstanding updates.
- waitOnRowLocks();
- LOG.debug("No more row locks outstanding on region " + this);
-
- // Don't flush the cache if we are aborting
- if (!abort) {
- internalFlushcache();
- }
- List<StoreFile> result = new ArrayList<StoreFile>();
- for (Store store: stores.values()) {
- result.addAll(store.close());
- }
- this.closed.set(true);
- LOG.info("Closed " + this);
- return result;
- } finally {
- splitsAndClosesLock.writeLock().unlock();
- }
- } finally {
- newScannerLock.writeLock().unlock();
- }
+ List<StoreFile> result = new ArrayList<StoreFile>();
+ for (Store store : stores.values()) {
+ result.addAll(store.close());
+ }
+ this.closed.set(true);
+ LOG.info("Closed " + this);
+ return result;
+ } finally {
+ lock.writeLock().unlock();
}
}
@@ -593,6 +578,17 @@ public class HRegion implements HeapSize
return this.regiondir;
}
+ /**
+ * Computes the Path of the HRegion
+ *
+ * @param tabledir qualified path for table
+ * @param name ENCODED region name
+ * @return Path of HRegion directory
+ */
+ public static Path getRegionDir(final Path tabledir, final String name) {
+ return new Path(tabledir, name);
+ }
+
/** @return FileSystem being used by this region */
public FileSystem getFilesystem() {
return this.fs;
@@ -623,113 +619,6 @@ public class HRegion implements HeapSize
}
/*
- * Split the HRegion to create two brand-new ones. This also closes
- * current HRegion. Split should be fast since we don't rewrite store files
- * but instead create new 'reference' store files that read off the top and
- * bottom ranges of parent store files.
- * @param splitRow row on which to split region
- * @return two brand-new HRegions or null if a split is not needed
- * @throws IOException
- */
- HRegion [] splitRegion(final byte [] splitRow) throws IOException {
- prepareToSplit();
- synchronized (splitLock) {
- if (closed.get()) {
- return null;
- }
- // Add start/end key checking: hbase-428.
- byte [] startKey = this.regionInfo.getStartKey();
- byte [] endKey = this.regionInfo.getEndKey();
- if (this.comparator.matchingRows(startKey, 0, startKey.length,
- splitRow, 0, splitRow.length)) {
- LOG.debug("Startkey and midkey are same, not splitting");
- return null;
- }
- if (this.comparator.matchingRows(splitRow, 0, splitRow.length,
- endKey, 0, endKey.length)) {
- LOG.debug("Endkey and midkey are same, not splitting");
- return null;
- }
- LOG.info("Starting split of region " + this);
- Path splits = new Path(this.regiondir, SPLITDIR);
- if(!this.fs.exists(splits)) {
- this.fs.mkdirs(splits);
- }
- // Calculate regionid to use. Can't be less than that of parent else
- // it'll insert into wrong location over in .META. table: HBASE-710.
- long rid = EnvironmentEdgeManager.currentTimeMillis();
- if (rid < this.regionInfo.getRegionId()) {
- LOG.warn("Clock skew; parent regions id is " +
- this.regionInfo.getRegionId() + " but current time here is " + rid);
- rid = this.regionInfo.getRegionId() + 1;
- }
- HRegionInfo regionAInfo = new HRegionInfo(this.regionInfo.getTableDesc(),
- startKey, splitRow, false, rid);
- Path dirA = getSplitDirForDaughter(splits, regionAInfo);
- HRegionInfo regionBInfo = new HRegionInfo(this.regionInfo.getTableDesc(),
- splitRow, endKey, false, rid);
- Path dirB = getSplitDirForDaughter(splits, regionBInfo);
-
- // Now close the HRegion. Close returns all store files or null if not
- // supposed to close (? What to do in this case? Implement abort of close?)
- // Close also does wait on outstanding rows and calls a flush just-in-case.
- List<StoreFile> hstoreFilesToSplit = close(false);
- if (hstoreFilesToSplit == null) {
- LOG.warn("Close came back null (Implement abort of close?)");
- throw new RuntimeException("close returned empty vector of HStoreFiles");
- }
-
- // Split each store file.
- for(StoreFile h: hstoreFilesToSplit) {
- StoreFile.split(fs,
- Store.getStoreHomedir(splits, regionAInfo.getEncodedName(),
- h.getFamily()),
- h, splitRow, Range.bottom);
- StoreFile.split(fs,
- Store.getStoreHomedir(splits, regionBInfo.getEncodedName(),
- h.getFamily()),
- h, splitRow, Range.top);
- }
-
- // Create a region instance and then move the splits into place under
- // regionA and regionB.
- HRegion regionA =
- HRegion.newHRegion(tableDir, log, fs, conf, regionAInfo, null);
- moveInitialFilesIntoPlace(this.fs, dirA, regionA.getRegionDir());
- HRegion regionB =
- HRegion.newHRegion(tableDir, log, fs, conf, regionBInfo, null);
- moveInitialFilesIntoPlace(this.fs, dirB, regionB.getRegionDir());
-
- return new HRegion [] {regionA, regionB};
- }
- }
-
- /*
- * Get the daughter directories in the splits dir. The splits dir is under
- * the parent regions' directory.
- * @param splits
- * @param hri
- * @return Path to split dir.
- * @throws IOException
- */
- private Path getSplitDirForDaughter(final Path splits, final HRegionInfo hri)
- throws IOException {
- Path d =
- new Path(splits, hri.getEncodedName());
- if (fs.exists(d)) {
- // This should never happen; the splits dir will be newly made when we
- // come in here. Even if we crashed midway through a split, the reopen
- // of the parent region clears out the dir in its initialize method.
- throw new IOException("Cannot split; target file collision at " + d);
- }
- return d;
- }
-
- protected void prepareToSplit() {
- // nothing
- }
-
- /*
* Do preparation for pending compaction.
* @throws IOException
*/
@@ -796,12 +685,16 @@ public class HRegion implements HeapSize
*/
byte [] compactStores(final boolean majorCompaction)
throws IOException {
- if (this.closing.get() || this.closed.get()) {
- LOG.debug("Skipping compaction on " + this + " because closing/closed");
+ if (this.closing.get()) {
+ LOG.debug("Skipping compaction on " + this + " because closing");
return null;
}
- splitsAndClosesLock.readLock().lock();
+ lock.readLock().lock();
try {
+ if (this.closed.get()) {
+ LOG.debug("Skipping compaction on " + this + " because closed");
+ return null;
+ }
byte [] splitRow = null;
if (this.closed.get()) {
return splitRow;
@@ -840,7 +733,7 @@ public class HRegion implements HeapSize
}
return splitRow;
} finally {
- splitsAndClosesLock.readLock().unlock();
+ lock.readLock().unlock();
}
}
@@ -865,41 +758,48 @@ public class HRegion implements HeapSize
* because a Snapshot was not properly persisted.
*/
public boolean flushcache() throws IOException {
- if (this.closed.get()) {
+ // fail-fast instead of waiting on the lock
+ if (this.closing.get()) {
+ LOG.debug("Skipping flush on " + this + " because closing");
return false;
}
- synchronized (writestate) {
- if (!writestate.flushing && writestate.writesEnabled) {
- this.writestate.flushing = true;
- } else {
- if(LOG.isDebugEnabled()) {
- LOG.debug("NOT flushing memstore for region " + this +
- ", flushing=" +
- writestate.flushing + ", writesEnabled=" +
- writestate.writesEnabled);
- }
+ lock.readLock().lock();
+ try {
+ if (this.closed.get()) {
+ LOG.debug("Skipping flush on " + this + " because closed");
return false;
}
- }
- try {
- // Prevent splits and closes
- splitsAndClosesLock.readLock().lock();
try {
+ synchronized (writestate) {
+ if (!writestate.flushing && writestate.writesEnabled) {
+ this.writestate.flushing = true;
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("NOT flushing memstore for region " + this +
+ ", flushing=" +
+ writestate.flushing + ", writesEnabled=" +
+ writestate.writesEnabled);
+ }
+ return false;
+ }
+ }
return internalFlushcache();
} finally {
- splitsAndClosesLock.readLock().unlock();
+ synchronized (writestate) {
+ writestate.flushing = false;
+ this.writestate.flushRequested = false;
+ writestate.notifyAll();
+ }
}
} finally {
- synchronized (writestate) {
- writestate.flushing = false;
- this.writestate.flushRequested = false;
- writestate.notifyAll();
- }
+ lock.readLock().unlock();
}
}
/**
- * Flushing the cache is a little tricky. We have a lot of updates in the
+ * Flush the memstore.
+ *
+ * Flushing the memstore is a little tricky. We have a lot of updates in the
* memstore, all of which have also been written to the log. We need to
* write those updates in the memstore out to disk, while being able to
* process reads/writes as much as possible during the flush operation. Also,
@@ -931,6 +831,19 @@ public class HRegion implements HeapSize
* because a Snapshot was not properly persisted.
*/
protected boolean internalFlushcache() throws IOException {
+ return internalFlushcache(this.log, -1);
+ }
+
+ /**
+ * @param wal Null if we're NOT to go via hlog/wal.
+ * @param myseqid The seqid to use if <code>wal</code> is null writing out
+ * flush file.
+ * @return true if the region needs compacting
+ * @throws IOException
+ * @see {@link #internalFlushcache()}
+ */
+ protected boolean internalFlushcache(final HLog wal, final long myseqid)
+ throws IOException {
final long startTime = EnvironmentEdgeManager.currentTimeMillis();
// Clear flush flag.
// Record latest flush time
@@ -942,7 +855,8 @@ public class HRegion implements HeapSize
if (LOG.isDebugEnabled()) {
LOG.debug("Started memstore flush for region " + this +
". Current region memstore size " +
- StringUtils.humanReadableInt(this.memstoreSize.get()));
+ StringUtils.humanReadableInt(this.memstoreSize.get()) +
+ ((wal != null)? "": "; wal is null, using passed sequenceid=" + myseqid));
}
// Stop updates while we snapshot the memstore of all stores. We only have
@@ -955,14 +869,14 @@ public class HRegion implements HeapSize
long sequenceId = -1L;
long completeSequenceId = -1L;
- // we have to take a write lock during snapshot, or else a write could
+ // We have to take a write lock during snapshot, or else a write could
// end up in both snapshot and memstore (makes it difficult to do atomic
// rows then)
this.updatesLock.writeLock().lock();
final long currentMemStoreSize = this.memstoreSize.get();
List<StoreFlusher> storeFlushers = new ArrayList<StoreFlusher>(stores.size());
try {
- sequenceId = log.startCacheFlush();
+ sequenceId = (wal == null)? myseqid: wal.startCacheFlush();
completeSequenceId = this.getCompleteCacheFlushSequenceId(sequenceId);
for (Store s : stores.values()) {
@@ -992,38 +906,14 @@ public class HRegion implements HeapSize
for (StoreFlusher flusher : storeFlushers) {
flusher.flushCache();
}
-
- Callable<Void> atomicWork = internalPreFlushcacheCommit();
-
- LOG.debug("Caches flushed, doing commit now (which includes update scanners)");
-
- /**
- * Switch between memstore(snapshot) and the new store file
- */
- if (atomicWork != null) {
- LOG.debug("internalPreFlushcacheCommit gives us work to do, acquiring newScannerLock");
- newScannerLock.writeLock().lock();
- }
-
- try {
- if (atomicWork != null) {
- atomicWork.call();
- }
-
- // Switch snapshot (in memstore) -> new hfile (thus causing
- // all the store scanners to reset/reseek).
- for (StoreFlusher flusher : storeFlushers) {
- boolean needsCompaction = flusher.commit();
- if (needsCompaction) {
- compactionRequested = true;
- }
- }
- } finally {
- if (atomicWork != null) {
- newScannerLock.writeLock().unlock();
+ // Switch snapshot (in memstore) -> new hfile (thus causing
+ // all the store scanners to reset/reseek).
+ for (StoreFlusher flusher : storeFlushers) {
+ boolean needsCompaction = flusher.commit();
+ if (needsCompaction) {
+ compactionRequested = true;
}
}
-
storeFlushers.clear();
// Set down the memstore size by amount of flush.
@@ -1035,7 +925,7 @@ public class HRegion implements HeapSize
// We used to only catch IOEs but its possible that we'd get other
// exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch
// all and sundry.
- this.log.abortCacheFlush();
+ if (wal != null) wal.abortCacheFlush();
DroppedSnapshotException dse = new DroppedSnapshotException("region: " +
Bytes.toStringBinary(getRegionName()));
dse.initCause(t);
@@ -1049,9 +939,11 @@ public class HRegion implements HeapSize
// This tells future readers that the HStores were emitted correctly,
// and that all updates to the log for this regionName that have lower
// log-sequence-ids can be safely ignored.
- this.log.completeCacheFlush(getRegionName(),
+ if (wal != null) {
+ wal.completeCacheFlush(getRegionName(),
regionInfo.getTableDesc().getName(), completeSequenceId,
this.getRegionInfo().isMetaRegion());
+ }
// C. Finally notify anyone waiting on memstore to clear:
// e.g. checkResources().
@@ -1063,27 +955,13 @@ public class HRegion implements HeapSize
long now = EnvironmentEdgeManager.currentTimeMillis();
LOG.info("Finished memstore flush of ~" +
StringUtils.humanReadableInt(currentMemStoreSize) + " for region " +
- this + " in " + (now - startTime) + "ms, sequence id=" + sequenceId +
- ", compaction requested=" + compactionRequested);
+ this + " in " + (now - startTime) + "ms, sequenceid=" + sequenceId +
+ ", compaction requested=" + compactionRequested +
+ ((wal == null)? "; wal=null": ""));
}
return compactionRequested;
}
-
- /**
- * A hook for sub classed wishing to perform operations prior to the cache
- * flush commit stage.
- *
- * If a subclass wishes that an atomic update of their work and the
- * flush commit stage happens, they should return a callable. The new scanner
- * lock will be acquired and released.
-
- * @throws java.io.IOException allow children to throw exception
- */
- protected Callable<Void> internalPreFlushcacheCommit() throws IOException {
- return null;
- }
-
/**
* Get the sequence number to be associated with this cache flush. Used by
* TransactionalRegion to not complete pending transactions.
@@ -1129,7 +1007,7 @@ public class HRegion implements HeapSize
// closest key is across all column families, since the data may be sparse
KeyValue key = null;
checkRow(row);
- splitsAndClosesLock.readLock().lock();
+ startRegionOperation();
try {
Store store = getStore(family);
KeyValue kv = new KeyValue(row, HConstants.LATEST_TIMESTAMP);
@@ -1142,7 +1020,7 @@ public class HRegion implements HeapSize
get.addFamily(family);
return get(get, null);
} finally {
- splitsAndClosesLock.readLock().unlock();
+ closeRegionOperation();
}
}
@@ -1162,11 +1040,8 @@ public class HRegion implements HeapSize
}
protected InternalScanner getScanner(Scan scan, List<KeyValueScanner> additionalScanners) throws IOException {
- newScannerLock.readLock().lock();
+ startRegionOperation();
try {
- if (this.closing.get() || this.closed.get()) {
- throw new NotServingRegionException("Region " + this + " closed");
- }
// Verify families are all valid
if(scan.hasFamilies()) {
for(byte [] family : scan.getFamilyMap().keySet()) {
@@ -1180,7 +1055,7 @@ public class HRegion implements HeapSize
return instantiateInternalScanner(scan, additionalScanners);
} finally {
- newScannerLock.readLock().unlock();
+ closeRegionOperation();
}
}
@@ -1222,7 +1097,7 @@ public class HRegion implements HeapSize
checkReadOnly();
checkResources();
Integer lid = null;
- splitsAndClosesLock.readLock().lock();
+ startRegionOperation();
try {
byte [] row = delete.getRow();
// If we did not pass an existing row lock, obtain a new one
@@ -1234,7 +1109,7 @@ public class HRegion implements HeapSize
} finally {
if(lockid == null) releaseRowLock(lid);
- splitsAndClosesLock.readLock().unlock();
+ closeRegionOperation();
}
}
@@ -1367,8 +1242,7 @@ public class HRegion implements HeapSize
// read lock, resources may run out. For now, the thought is that this
// will be extremely rare; we'll deal with it when it happens.
checkResources();
- splitsAndClosesLock.readLock().lock();
-
+ startRegionOperation();
try {
// We obtain a per-row lock, so other clients will block while one client
// performs an update. The read lock is released by the client calling
@@ -1386,7 +1260,7 @@ public class HRegion implements HeapSize
if(lockid == null) releaseRowLock(lid);
}
} finally {
- splitsAndClosesLock.readLock().unlock();
+ closeRegionOperation();
}
}
@@ -1439,12 +1313,12 @@ public class HRegion implements HeapSize
checkResources();
long newSize;
- splitsAndClosesLock.readLock().lock();
+ startRegionOperation();
try {
long addedSize = doMiniBatchPut(batchOp);
newSize = memstoreSize.addAndGet(addedSize);
} finally {
- splitsAndClosesLock.readLock().unlock();
+ closeRegionOperation();
}
if (isFlushSize(newSize)) {
requestFlush();
@@ -1582,7 +1456,7 @@ public class HRegion implements HeapSize
if (!isPut && !(w instanceof Delete))
throw new IOException("Action must be Put or Delete");
- splitsAndClosesLock.readLock().lock();
+ startRegionOperation();
try {
RowLock lock = isPut ? ((Put)w).getRowLock() : ((Delete)w).getRowLock();
Get get = new Get(row, lock);
@@ -1596,7 +1470,8 @@ public class HRegion implements HeapSize
result = get(get);
boolean matches = false;
- if (result.size() == 0 && expectedValue.length == 0) {
+ if (result.size() == 0 &&
+ (expectedValue == null || expectedValue.length == 0)) {
matches = true;
} else if (result.size() == 1) {
//Compare the expected value with the actual value
@@ -1620,7 +1495,7 @@ public class HRegion implements HeapSize
if(lockId == null) releaseRowLock(lid);
}
} finally {
- splitsAndClosesLock.readLock().unlock();
+ closeRegionOperation();
}
}
@@ -1828,46 +1703,75 @@ public class HRegion implements HeapSize
* Read the edits log put under this region by wal log splitting process. Put
* the recovered edits back up into this region.
*
- * We can ignore any log message that has a sequence ID that's equal to or
+ * <p>We can ignore any log message that has a sequence ID that's equal to or
* lower than minSeqId. (Because we know such log messages are already
* reflected in the HFiles.)
+ *
+ * <p>While this is running we are putting pressure on memory yet we are
+ * outside of our usual accounting because we are not yet an onlined region
+ * (this stuff is being run as part of Region initialization). This means
+ * that if we're up against global memory limits, we'll not be flagged to flush
+ * because we are not online. We can't be flushed by usual mechanisms anyways;
+ * we're not yet online so our relative sequenceids are not yet aligned with
+ * HLog sequenceids -- not till we come up online, post processing of split
+ * edits.
+ *
+ * <p>But to help relieve memory pressure, at least manage our own heap size
+ * flushing if are in excess of per-region limits. Flushing, though, we have
+ * to be careful and avoid using the regionserver/hlog sequenceid. Its running
+ * on a different line to whats going on in here in this region context so if we
+ * crashed replaying these edits, but in the midst had a flush that used the
+ * regionserver log with a sequenceid in excess of whats going on in here
+ * in this region and with its split editlogs, then we could miss edits the
+ * next time we go to recover. So, we have to flush inline, using seqids that
+ * make sense in a this single region context only -- until we online.
+ *
* @param regiondir
- * @param minSeqId Minimum sequenceid found in a store file. Edits in log
- * must be larger than this to be replayed.
+ * @param minSeqId Any edit found in split editlogs needs to be in excess of
+ * this minSeqId to be applied, else its skipped.
* @param reporter
* @return the sequence id of the last edit added to this region out of the
- * recovered edits log, or -1 if no log recovered
+ * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
* @throws UnsupportedEncodingException
* @throws IOException
*/
protected long replayRecoveredEditsIfAny(final Path regiondir,
final long minSeqId, final Progressable reporter)
throws UnsupportedEncodingException, IOException {
- Path edits = new Path(regiondir, HLog.RECOVERED_EDITS);
- if (edits == null || !this.fs.exists(edits)) return -1;
- if (isZeroLengthThenDelete(this.fs, edits)) return -1;
- long maxSeqIdInLog = -1;
- try {
- maxSeqIdInLog = replayRecoveredEdits(edits, minSeqId, reporter);
- LOG.debug("Deleting recovered edits file: " + edits);
- if (!this.fs.delete(edits, false)) {
- LOG.error("Failed delete of " + edits);
- }
- } catch (IOException e) {
- boolean skipErrors = conf.getBoolean("hbase.skip.errors", false);
- if (skipErrors) {
- Path moveAsideName = new Path(edits.getParent(), edits.getName() + "." +
- System.currentTimeMillis());
- LOG.error("hbase.skip.errors=true so continuing. Renamed " + edits +
- " as " + moveAsideName, e);
- if (!this.fs.rename(edits, moveAsideName)) {
- LOG.error("hbase.skip.errors=true so continuing. Rename failed");
+ long seqid = minSeqId;
+ NavigableSet<Path> files = HLog.getSplitEditFilesSorted(this.fs, regiondir);
+ if (files == null || files.isEmpty()) return seqid;
+ for (Path edits: files) {
+ if (edits == null || !this.fs.exists(edits)) {
+ LOG.warn("Null or non-existent edits file: " + edits);
+ continue;
+ }
+ if (isZeroLengthThenDelete(this.fs, edits)) continue;
+ try {
+ seqid = replayRecoveredEdits(edits, seqid, reporter);
+ } catch (IOException e) {
+ boolean skipErrors = conf.getBoolean("hbase.skip.errors", false);
+ if (skipErrors) {
+ Path p = HLog.moveAsideBadEditsFile(fs, edits);
+ LOG.error("hbase.skip.errors=true so continuing. Renamed " + edits +
+ " as " + p, e);
+ } else {
+ throw e;
+ }
+ }
+ }
+ if (seqid > minSeqId) {
+ // Then we added some edits to memory. Flush and cleanup split edit files.
+ internalFlushcache(null, seqid);
+ for (Path file: files) {
+ if (!this.fs.delete(file, false)) {
+ LOG.error("Failed delete of " + file);
+ } else {
+ LOG.debug("Deleted recovered.edits file=" + file);
}
- } else {
- throw e;
}
}
- return maxSeqIdInLog;
+ return seqid;
}
/*
@@ -1876,12 +1780,13 @@ public class HRegion implements HeapSize
* must be larger than this to be replayed.
* @param reporter
* @return the sequence id of the last edit added to this region out of the
- * recovered edits log, or -1 if no log recovered
+ * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
* @throws IOException
*/
private long replayRecoveredEdits(final Path edits,
final long minSeqId, final Progressable reporter)
throws IOException {
+ LOG.info("Replaying edits from " + edits + "; minSequenceid=" + minSeqId);
HLog.Reader reader = HLog.getReader(this.fs, edits, conf);
try {
return replayRecoveredEdits(reader, minSeqId, reporter);
@@ -1891,26 +1796,22 @@ public class HRegion implements HeapSize
}
/* @param reader Reader against file of recovered edits.
- * @param minSeqId Minimum sequenceid found in a store file. Edits in log
- * must be larger than this to be replayed.
+ * @param minSeqId Any edit found in split editlogs needs to be in excess of
+ * this minSeqId to be applied, else its skipped.
* @param reporter
* @return the sequence id of the last edit added to this region out of the
- * recovered edits log, or -1 if no log recovered
+ * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
* @throws IOException
*/
private long replayRecoveredEdits(final HLog.Reader reader,
final long minSeqId, final Progressable reporter)
throws IOException {
- long currentEditSeqId = -1;
+ long currentEditSeqId = minSeqId;
long firstSeqIdInLog = -1;
long skippedEdits = 0;
long editsCount = 0;
HLog.Entry entry;
Store store = null;
- // Get map of family name to maximum sequence id. Do it here up front
- // because as we progress, the sequence id can change if we happen to flush
- // The flush ups the seqid for the Store. The new seqid can cause us skip edits.
- Map<byte [], Long> familyToOriginalMaxSeqId = familyToMaxSeqId(this.stores);
// How many edits to apply before we send a progress report.
int interval = this.conf.getInt("hbase.hstore.report.interval.edits", 2000);
while ((entry = reader.next()) != null) {
@@ -1920,12 +1821,13 @@ public class HRegion implements HeapSize
firstSeqIdInLog = key.getLogSeqNum();
}
// Now, figure if we should skip this edit.
- currentEditSeqId = Math.max(currentEditSeqId, key.getLogSeqNum());
- if (key.getLogSeqNum() <= minSeqId) {
+ if (key.getLogSeqNum() <= currentEditSeqId) {
skippedEdits++;
continue;
}
- for (KeyValue kv : val.getKeyValues()) {
+ currentEditSeqId = key.getLogSeqNum();
+ boolean flush = false;
+ for (KeyValue kv: val.getKeyValues()) {
// Check this edit is for me. Also, guard against writing the special
// METACOLUMN info such as HBASE::CACHEFLUSH entries
if (kv.matchingFamily(HLog.METAFAMILY) ||
@@ -1944,16 +1846,13 @@ public class HRegion implements HeapSize
skippedEdits++;
continue;
}
- // The edits' id has to be in excess of the original max seqid of the
- // targeted store.
- long storeMaxSeqId = familyToOriginalMaxSeqId.get(store.getFamily().getName());
- if (currentEditSeqId < storeMaxSeqId) {
- skippedEdits++;
- continue;
- }
- restoreEdit(kv);
+ // Once we are over the limit, restoreEdit will keep returning true to
+ // flush -- but don't flush until we've played all the kvs that make up
+ // the WALEdit.
+ flush = restoreEdit(store, kv);
editsCount++;
}
+ if (flush) internalFlushcache(null, currentEditSeqId);
// Every 'interval' edits, tell the reporter we're making progress.
// Have seen 60k edits taking 3minutes to complete.
@@ -1963,40 +1862,20 @@ public class HRegion implements HeapSize
}
if (LOG.isDebugEnabled()) {
LOG.debug("Applied " + editsCount + ", skipped " + skippedEdits +
- ", firstSeqIdInLog=" + firstSeqIdInLog +
- ", maxSeqIdInLog=" + currentEditSeqId);
+ ", firstSequenceidInLog=" + firstSeqIdInLog +
+ ", maxSequenceidInLog=" + currentEditSeqId);
}
return currentEditSeqId;
}
- /*
- * @param stores
- * @return Map of family name to maximum sequenceid.
- */
- private Map<byte [], Long> familyToMaxSeqId(final Map<byte [], Store> stores) {
- Map<byte [], Long> map = new TreeMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
- for (Map.Entry<byte [], Store> e: stores.entrySet()) {
- map.put(e.getKey(), e.getValue().getMaxSequenceId());
- }
- return map;
- }
-
- /*
- * @param kv Apply this value to this region.
- * @throws IOException
+ /**
+ * Used by tests
+ * @param s Store to add edit too.
+ * @param kv KeyValue to add.
+ * @return True if we should flush.
*/
- // This method is protected so can be called from tests.
- protected void restoreEdit(final KeyValue kv) throws IOException {
- // This is really expensive to do per edit. Loads of object creation.
- // TODO: Optimization. Apply edits batched by family.
- Map<byte [], List<KeyValue>> map =
- new TreeMap<byte [], List<KeyValue>>(Bytes.BYTES_COMPARATOR);
- map.put(kv.getFamily(), Collections.singletonList(kv));
- if (kv.isDelete()) {
- delete(map, true);
- } else {
- put(map, true);
- }
+ protected boolean restoreEdit(final Store s, final KeyValue kv) {
+ return isFlushSize(this.memstoreSize.addAndGet(s.add(kv)));
}
/*
@@ -2069,7 +1948,12 @@ public class HRegion implements HeapSize
* @return The id of the held lock.
*/
public Integer obtainRowLock(final byte [] row) throws IOException {
- return internalObtainRowLock(row, true);
+ startRegionOperation();
+ try {
+ return internalObtainRowLock(row, true);
+ } finally {
+ closeRegionOperation();
+ }
}
/**
@@ -2079,7 +1963,12 @@ public class HRegion implements HeapSize
* @see HRegion#obtainRowLock(byte[])
*/
public Integer tryObtainRowLock(final byte[] row) throws IOException {
- return internalObtainRowLock(row, false);
+ startRegionOperation();
+ try {
+ return internalObtainRowLock(row, false);
+ } finally {
+ closeRegionOperation();
+ }
}
/**
@@ -2091,11 +1980,8 @@ public class HRegion implements HeapSize
private Integer internalObtainRowLock(final byte[] row, boolean waitForLock)
throws IOException {
checkRow(row);
- splitsAndClosesLock.readLock().lock();
+ startRegionOperation();
try {
- if (this.closed.get()) {
- throw new NotServingRegionException(this + " is closed");
- }
synchronized (lockedRows) {
while (lockedRows.contains(row)) {
if (!waitForLock) {
@@ -2129,7 +2015,7 @@ public class HRegion implements HeapSize
return lockId;
}
} finally {
- splitsAndClosesLock.readLock().unlock();
+ closeRegionOperation();
}
}
@@ -2193,24 +2079,9 @@ public class HRegion implements HeapSize
return lid;
}
- private void waitOnRowLocks() {
- synchronized (lockedRows) {
- while (!this.lockedRows.isEmpty()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Waiting on " + this.lockedRows.size() + " row locks");
- }
- try {
- this.lockedRows.wait();
- } catch (InterruptedException e) {
- // Catch. Let while test determine loop-end.
- }
- }
- }
- }
-
public void bulkLoadHFile(String hfilePath, byte[] familyName)
throws IOException {
- splitsAndClosesLock.readLock().lock();
+ startRegionOperation();
try {
Store store = getStore(familyName);
if (store == null) {
@@ -2219,7 +2090,7 @@ public class HRegion implements HeapSize
}
store.bulkLoadHFile(hfilePath);
} finally {
- splitsAndClosesLock.readLock().unlock();
+ closeRegionOperation();
}
}
@@ -2312,24 +2183,24 @@ public class HRegion implements HeapSize
"after we renewed it. Could be caused by a very slow scanner " +
"or a lengthy garbage collection");
}
- if (closing.get() || closed.get()) {
- close();
- throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
- " is closing=" + closing.get() + " or closed=" + closed.get());
- }
+ startRegionOperation();
+ try {
- // This could be a new thread from the last time we called next().
- ReadWriteConsistencyControl.setThreadReadPoint(this.readPt);
+ // This could be a new thread from the last time we called next().
+ ReadWriteConsistencyControl.setThreadReadPoint(this.readPt);
- results.clear();
- boolean returnResult = nextInternal(limit);
+ results.clear();
+ boolean returnResult = nextInternal(limit);
- outResults.addAll(results);
- resetFilters();
- if (isFilterDone()) {
- return false;
+ outResults.addAll(results);
+ resetFilters();
+ if (isFilterDone()) {
+ return false;
+ }
+ return returnResult;
+ } finally {
+ closeRegionOperation();
}
- return returnResult;
}
public synchronized boolean next(List<KeyValue> outResults)
@@ -2669,17 +2540,6 @@ public class HRegion implements HeapSize
/**
* Computes the Path of the HRegion
*
- * @param tabledir qualified path for table
- * @param name ENCODED region name
- * @return Path of HRegion directory
- */
- public static Path getRegionDir(final Path tabledir, final String name) {
- return new Path(tabledir, name);
- }
-
- /**
- * Computes the Path of the HRegion
- *
* @param rootdir qualified path of HBase root directory
* @param info HRegionInfo for the region
* @return qualified path of region directory
@@ -2999,50 +2859,52 @@ public class HRegion implements HeapSize
checkRow(row);
boolean flush = false;
// Lock row
- Integer lid = obtainRowLock(row);
long result = amount;
+ startRegionOperation();
try {
- Store store = stores.get(family);
-
- // TODO call the proper GET API
- // Get the old value:
- Get get = new Get(row);
- get.addColumn(family, qualifier);
- List<KeyValue> results = new ArrayList<KeyValue>();
- NavigableSet<byte[]> qualifiers = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
- qualifiers.add(qualifier);
- store.get(get, qualifiers, results);
-
- if (!results.isEmpty()) {
- KeyValue kv = results.get(0);
- byte [] buffer = kv.getBuffer();
- int valueOffset = kv.getValueOffset();
- result += Bytes.toLong(buffer, valueOffset, Bytes.SIZEOF_LONG);
- }
-
- // bulid the KeyValue now:
- KeyValue newKv = new KeyValue(row, family,
- qualifier, EnvironmentEdgeManager.currentTimeMillis(),
- Bytes.toBytes(result));
+ Integer lid = obtainRowLock(row);
+ try {
+ Store store = stores.get(family);
- // now log it:
- if (writeToWAL) {
- long now = EnvironmentEdgeManager.currentTimeMillis();
- WALEdit walEdit = new WALEdit();
- walEdit.add(newKv);
- this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
- walEdit, now);
- }
+ // Get the old value:
+ Get get = new Get(row);
+ get.addColumn(family, qualifier);
+
+ List<KeyValue> results = get(get);
+
+ if (!results.isEmpty()) {
+ KeyValue kv = results.get(0);
+ byte [] buffer = kv.getBuffer();
+ int valueOffset = kv.getValueOffset();
+ result += Bytes.toLong(buffer, valueOffset, Bytes.SIZEOF_LONG);
+ }
+
+ // bulid the KeyValue now:
+ KeyValue newKv = new KeyValue(row, family,
+ qualifier, EnvironmentEdgeManager.currentTimeMillis(),
+ Bytes.toBytes(result));
+
+ // now log it:
+ if (writeToWAL) {
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ WALEdit walEdit = new WALEdit();
+ walEdit.add(newKv);
+ this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
+ walEdit, now);
+ }
- // Now request the ICV to the store, this will set the timestamp
- // appropriately depending on if there is a value in memcache or not.
- // returns the
- long size = store.updateColumnValue(row, family, qualifier, result);
+ // Now request the ICV to the store, this will set the timestamp
+ // appropriately depending on if there is a value in memcache or not.
+ // returns the
+ long size = store.updateColumnValue(row, family, qualifier, result);
- size = this.memstoreSize.addAndGet(size);
- flush = isFlushSize(size);
+ size = this.memstoreSize.addAndGet(size);
+ flush = isFlushSize(size);
+ } finally {
+ releaseRowLock(lid);
+ }
} finally {
- releaseRowLock(lid);
+ closeRegionOperation();
}
if (flush) {
@@ -3069,7 +2931,7 @@ public class HRegion implements HeapSize
public static final long FIXED_OVERHEAD = ClassSize.align(
(4 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN +
- (20 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT);
+ (18 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT);
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
ClassSize.OBJECT + (2 * ClassSize.ATOMIC_BOOLEAN) +
@@ -3187,6 +3049,34 @@ public class HRegion implements HeapSize
}
/**
+ * This method needs to be called before any public call that reads or
+ * modifies data. It has to be called just before a try.
+ * #closeRegionOperation needs to be called in the try's finally block
+ * Acquires a read lock and checks if the region is closing or closed.
+ * @throws NotServingRegionException when the region is closing or closed
+ */
+ private void startRegionOperation() throws NotServingRegionException {
+ if (this.closing.get()) {
+ throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
+ " is closing");
+ }
+ lock.readLock().lock();
+ if (this.closed.get()) {
+ lock.readLock().unlock();
+ throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
+ " is closed");
+ }
+ }
+
+ /**
+ * Closes the lock. This needs to be called in the finally block corresponding
+ * to the try block of #startRegionOperation
+ */
+ private void closeRegionOperation(){
+ lock.readLock().unlock();
+ }
+
+ /**
* A mocked list implementaion - discards all updates.
*/
private static final List<KeyValue> MOCKED_LIST = new AbstractList<KeyValue>() {
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Fri Aug 27 05:01:02 2010
@@ -49,6 +49,10 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -104,6 +108,7 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.regionserver.handler.OpenRootHandler;
import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.InfoServer;
@@ -250,6 +255,9 @@ public class HRegionServer implements HR
// Instance of the hbase executor service.
private ExecutorService service;
+ // Replication services
+ private Replication replicationHandler;
+
/**
* Starts a HRegionServer at the default location
*
@@ -676,6 +684,14 @@ public class HRegionServer implements HR
+ this.serverInfo.getServerAddress() + ", Now=" + hsa.toString());
this.serverInfo.setServerAddress(hsa);
}
+
+ // hack! Maps DFSClient => RegionServer for logs. HDFS made this
+ // config param for task trackers, but we can piggyback off of it.
+ if (this.conf.get("mapred.task.id") == null) {
+ this.conf.set("mapred.task.id",
+ "hb_rs_" + this.serverInfo.getServerName());
+ }
+
// Master sent us hbase.rootdir to use. Should be fully qualified
// path with file system specification included. Set 'fs.defaultFS'
// to match the filesystem on hbase.rootdir else underlying hadoop hdfs
@@ -756,6 +772,11 @@ public class HRegionServer implements HR
* @return Throwable converted to an IOE; methods can only let out IOEs.
*/
private Throwable cleanup(final Throwable t, final String msg) {
+ // Don't log as error if NSRE; NSRE is 'normal' operation.
+ if (t instanceof NotServingRegionException) {
+ LOG.debug("NotServingRegionException; " + t.getMessage());
+ return t;
+ }
if (msg == null) {
LOG.error("", RemoteExceptionHandler.checkThrowable(t));
} else {
@@ -877,16 +898,19 @@ public class HRegionServer implements HR
+ "running at " + this.serverInfo.getServerName()
+ " because logdir " + logdir.toString() + " exists");
}
- HLog newlog = instantiateHLog(logdir, oldLogDir);
- return newlog;
+ this.replicationHandler = new Replication(this, this.fs, logdir, oldLogDir);
+ HLog log = instantiateHLog(logdir, oldLogDir);
+ this.replicationHandler.addLogEntryVisitor(log);
+ return log;
}
// instantiate
- protected HLog instantiateHLog(Path logdir, Path oldLogDir)
- throws IOException {
- HLog newlog = new HLog(fs, logdir, oldLogDir, conf, hlogRoller, null,
- serverInfo.getServerAddress().toString());
- return newlog;
+ protected HLog instantiateHLog(Path logdir, Path oldLogDir) throws IOException {
+ return new HLog(this.fs, logdir, oldLogDir, this.conf, this.hlogRoller,
+ this.replicationHandler != null?
+ this.replicationHandler.getReplicationManager():
+ null,
+ this.serverInfo.getServerAddress().toString());
}
protected LogRoller getLogRoller() {
@@ -1026,13 +1050,15 @@ public class HRegionServer implements HR
port++;
// update HRS server info port.
this.serverInfo = new HServerInfo(this.serverInfo.getServerAddress(),
- this.serverInfo.getStartCode(), port, this.serverInfo
- .getHostname());
+ this.serverInfo.getStartCode(), port,
+ this.serverInfo.getHostname());
}
}
}
- // Start Server. This service is like leases in that it internally runs
+ this.replicationHandler.startReplicationServices();
+
+ // Start Server. This service is like leases in that it internally runs
// a thread.
this.server.start();
LOG.info("HRegionServer started at: "
@@ -1119,7 +1145,7 @@ public class HRegionServer implements HR
this.abortRequested = true;
this.reservedSpace.clear();
if (this.metrics != null) {
- LOG.info("Dump of metrics: " + this.metrics.toString());
+ LOG.info("Dump of metrics: " + this.metrics);
}
stop(reason);
}
@@ -1151,6 +1177,7 @@ public class HRegionServer implements HR
Threads.shutdown(this.compactSplitThread);
Threads.shutdown(this.hlogRoller);
this.service.shutdown();
+ this.replicationHandler.join();
}
/**
@@ -1254,10 +1281,6 @@ public class HRegionServer implements HR
+ newRegionB.getRegionNameAsString())));
}
- // ////////////////////////////////////////////////////////////////////////////
- // HMaster-given operations
- // ////////////////////////////////////////////////////////////////////////////
-
/**
* Closes all regions. Called on our way out.
* Assumes that its not possible for new regions to be added to onlineRegions
@@ -2221,7 +2244,7 @@ public class HRegionServer implements HR
if (message != null) {
System.err.println(message);
}
- System.err.println("Usage: java org.apache.hbase.HRegionServer start|stop");
+ System.err.println("Usage: java org.apache.hbase.HRegionServer start|stop [-D <conf.param=value>]");
System.exit(0);
}
@@ -2245,6 +2268,11 @@ public class HRegionServer implements HR
}
}
+ @Override
+ public void replicateLogEntries(HLog.Entry[] entries) throws IOException {
+ this.replicationHandler.replicateLogEntries(entries);
+ }
+
/**
* Do class main.
*
@@ -2254,15 +2282,26 @@ public class HRegionServer implements HR
*/
protected static void doMain(final String[] args,
final Class<? extends HRegionServer> regionServerClass) {
- if (args.length < 1) {
- printUsageAndExit();
- }
Configuration conf = HBaseConfiguration.create();
- // Process command-line args. TODO: Better cmd-line processing
- // (but hopefully something not as painful as cli options).
- for (String cmd : args) {
- if (cmd.equals("start")) {
+ Options opt = new Options();
+ opt.addOption("D", true, "Override HBase Configuration Settings");
+ try {
+ CommandLine cmd = new GnuParser().parse(opt, args);
+
+ if (cmd.hasOption("D")) {
+ for (String confOpt : cmd.getOptionValues("D")) {
+ String[] kv = confOpt.split("=", 2);
+ if (kv.length == 2) {
+ conf.set(kv[0], kv[1]);
+ LOG.debug("-D configuration override: " + kv[0] + "=" + kv[1]);
+ } else {
+ throw new ParseException("-D option format invalid: " + confOpt);
+ }
+ }
+ }
+
+ if (cmd.getArgList().contains("start")) {
try {
// If 'local', don't start a region server here. Defer to
// LocalHBaseCluster. It manages 'local' clusters.
@@ -2278,19 +2317,20 @@ public class HRegionServer implements HR
startRegionServer(hrs);
}
} catch (Throwable t) {
- LOG.error("Can not start region server because "
- + StringUtils.stringifyException(t));
+ LOG.error( "Can not start region server because "+
+ StringUtils.stringifyException(t) );
+ System.exit(-1);
}
- break;
- }
-
- if (cmd.equals("stop")) {
- printUsageAndExit("To shutdown the regionserver run "
- + "bin/hbase-daemon.sh stop regionserver or send a kill signal to"
- + "the regionserver pid");
+ } else if (cmd.getArgList().contains("stop")) {
+ throw new ParseException("To shutdown the regionserver run " +
+ "bin/hbase-daemon.sh stop regionserver or send a kill signal to" +
+ "the regionserver pid");
+ } else {
+ throw new ParseException("Unknown argument(s): " +
+ org.apache.commons.lang.StringUtils.join(cmd.getArgs(), " "));
}
-
- // Print out usage if we get to here.
+ } catch (ParseException e) {
+ LOG.error("Could not parse", e);
printUsageAndExit();
}
}
@@ -2305,4 +2345,8 @@ public class HRegionServer implements HR
.getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
doMain(args, regionServerClass);
}
+
+ public int getNumberOfOnlineRegions() {
+ return onlineRegions.size();
+ }
}
\ No newline at end of file
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java Fri Aug 27 05:01:02 2010
@@ -41,7 +41,7 @@ import java.util.PriorityQueue;
* as an InternalScanner at the Store level, you will get runtime exceptions.
*/
public class KeyValueHeap implements KeyValueScanner, InternalScanner {
- private PriorityQueue<KeyValueScanner> heap;
+ private PriorityQueue<KeyValueScanner> heap = null;
private KeyValueScanner current = null;
private KVScannerComparator comparator;
@@ -51,22 +51,25 @@ public class KeyValueHeap implements Key
* @param scanners
* @param comparator
*/
- public KeyValueHeap(List<? extends KeyValueScanner> scanners, KVComparator comparator) {
+ public KeyValueHeap(List<? extends KeyValueScanner> scanners,
+ KVComparator comparator) {
this.comparator = new KVScannerComparator(comparator);
- this.heap = new PriorityQueue<KeyValueScanner>(scanners.size(),
- this.comparator);
- for (KeyValueScanner scanner : scanners) {
- if (scanner.peek() != null) {
- this.heap.add(scanner);
- } else {
- scanner.close();
+ if (!scanners.isEmpty()) {
+ this.heap = new PriorityQueue<KeyValueScanner>(scanners.size(),
+ this.comparator);
+ for (KeyValueScanner scanner : scanners) {
+ if (scanner.peek() != null) {
+ this.heap.add(scanner);
+ } else {
+ scanner.close();
+ }
}
+ this.current = heap.poll();
}
- this.current = heap.poll();
}
public KeyValue peek() {
- if(this.current == null) {
+ if (this.current == null) {
return null;
}
return this.current.peek();
@@ -78,12 +81,12 @@ public class KeyValueHeap implements Key
}
KeyValue kvReturn = this.current.next();
KeyValue kvNext = this.current.peek();
- if(kvNext == null) {
+ if (kvNext == null) {
this.current.close();
this.current = this.heap.poll();
} else {
KeyValueScanner topScanner = this.heap.peek();
- if(topScanner == null ||
+ if (topScanner == null ||
this.comparator.compare(kvNext, topScanner.peek()) > 0) {
this.heap.add(this.current);
this.current = this.heap.poll();
@@ -104,10 +107,20 @@ public class KeyValueHeap implements Key
* @return true if there are more keys, false if all scanners are done
*/
public boolean next(List<KeyValue> result, int limit) throws IOException {
+ if (this.current == null) {
+ return false;
+ }
InternalScanner currentAsInternal = (InternalScanner)this.current;
- currentAsInternal.next(result, limit);
+ boolean mayContainsMoreRows = currentAsInternal.next(result, limit);
KeyValue pee = this.current.peek();
- if (pee == null) {
+ /*
+ * By definition, any InternalScanner must return false only when it has no
+ * further rows to be fetched. So, we can close a scanner if it returns
+ * false. All existing implementations seem to be fine with this. It is much
+ * more efficient to close scanners which are not needed than keep them in
+ * the heap. This is also required for certain optimizations.
+ */
+ if (pee == null || !mayContainsMoreRows) {
this.current.close();
} else {
this.heap.add(this.current);
@@ -160,12 +173,14 @@ public class KeyValueHeap implements Key
}
public void close() {
- if(this.current != null) {
+ if (this.current != null) {
this.current.close();
}
- KeyValueScanner scanner;
- while((scanner = this.heap.poll()) != null) {
- scanner.close();
+ if (this.heap != null) {
+ KeyValueScanner scanner;
+ while ((scanner = this.heap.poll()) != null) {
+ scanner.close();
+ }
}
}
@@ -178,10 +193,10 @@ public class KeyValueHeap implements Key
* automatically closed and removed from the heap.
* @param seekKey KeyValue to seek at or after
* @return true if KeyValues exist at or after specified key, false if not
- * @throws IOException
+ * @throws IOException
*/
public boolean seek(KeyValue seekKey) throws IOException {
- if(this.current == null) {
+ if (this.current == null) {
return false;
}
this.heap.add(this.current);
@@ -205,6 +220,33 @@ public class KeyValueHeap implements Key
return false;
}
+ public boolean reseek(KeyValue seekKey) throws IOException {
+ //This function is very identical to the seek(KeyValue) function except that
+ //scanner.seek(seekKey) is changed to scanner.reseek(seekKey)
+ if (this.current == null) {
+ return false;
+ }
+ this.heap.add(this.current);
+ this.current = null;
+
+ KeyValueScanner scanner;
+ while ((scanner = this.heap.poll()) != null) {
+ KeyValue topKey = scanner.peek();
+ if (comparator.getComparator().compare(seekKey, topKey) <= 0) {
+ // Top KeyValue is at-or-after Seek KeyValue
+ this.current = scanner;
+ return true;
+ }
+ if (!scanner.reseek(seekKey)) {
+ scanner.close();
+ } else {
+ this.heap.add(scanner);
+ }
+ }
+ // Heap is returning empty, scanner is done
+ return false;
+ }
+
/**
* @return the current Heap
*/
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java Fri Aug 27 05:01:02 2010
@@ -47,6 +47,16 @@ public interface KeyValueScanner {
public boolean seek(KeyValue key) throws IOException;
/**
+ * Reseek the scanner at or after the specified KeyValue.
+ * This method is guaranteed to seek to or before the required key only if the
+ * key comes after the current position of the scanner. Should not be used
+ * to seek to a key which may come before the current position.
+ * @param key seek value (should be non-null)
+ * @return true if scanner has values left, false if end of scanner
+ */
+ public boolean reseek(KeyValue key) throws IOException;
+
+ /**
* Close the KeyValue scanner.
*/
public void close();
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Fri Aug 27 05:01:02 2010
@@ -20,18 +20,14 @@
package org.apache.hadoop.hbase.regionserver;
-import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.rmi.UnexpectedException;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
-import java.util.Set;
import java.util.SortedSet;
-import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -39,8 +35,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.HeapSize;
-import org.apache.hadoop.hbase.regionserver.DeleteCompare.DeleteCode;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
@@ -81,6 +77,9 @@ public class MemStore implements HeapSiz
// Used to track own heapSize
final AtomicLong size;
+ TimeRangeTracker timeRangeTracker;
+ TimeRangeTracker snapshotTimeRangeTracker;
+
/**
* Default constructor. Used for tests.
*/
@@ -99,6 +98,8 @@ public class MemStore implements HeapSiz
this.comparatorIgnoreType = this.comparator.getComparatorIgnoringType();
this.kvset = new KeyValueSkipListSet(c);
this.snapshot = new KeyValueSkipListSet(c);
+ timeRangeTracker = new TimeRangeTracker();
+ snapshotTimeRangeTracker = new TimeRangeTracker();
this.size = new AtomicLong(DEEP_OVERHEAD);
}
@@ -128,6 +129,8 @@ public class MemStore implements HeapSiz
if (!this.kvset.isEmpty()) {
this.snapshot = this.kvset;
this.kvset = new KeyValueSkipListSet(this.comparator);
+ this.snapshotTimeRangeTracker = this.timeRangeTracker;
+ this.timeRangeTracker = new TimeRangeTracker();
// Reset heap to not include any keys
this.size.set(DEEP_OVERHEAD);
}
@@ -167,6 +170,7 @@ public class MemStore implements HeapSiz
// create a new snapshot and let the old one go.
if (!ss.isEmpty()) {
this.snapshot = new KeyValueSkipListSet(this.comparator);
+ this.snapshotTimeRangeTracker = new TimeRangeTracker();
}
} finally {
this.lock.writeLock().unlock();
@@ -183,6 +187,7 @@ public class MemStore implements HeapSiz
this.lock.readLock().lock();
try {
s = heapSizeChange(kv, this.kvset.add(kv));
+ timeRangeTracker.includeTimestamp(kv);
this.size.addAndGet(s);
} finally {
this.lock.readLock().unlock();
@@ -198,9 +203,9 @@ public class MemStore implements HeapSiz
long delete(final KeyValue delete) {
long s = 0;
this.lock.readLock().lock();
-
try {
s += heapSizeChange(delete, this.kvset.add(delete));
+ timeRangeTracker.includeTimestamp(delete);
} finally {
this.lock.readLock().unlock();
}
@@ -341,6 +346,112 @@ public class MemStore implements HeapSiz
}
}
+ /**
+ * Given the specs of a column, update it, first by inserting a new record,
+ * then removing the old one. Since there is only 1 KeyValue involved, the memstoreTS
+ * will be set to 0, thus ensuring that they instantly appear to anyone. The underlying
+ * store will ensure that the insert/delete each are atomic. A scanner/reader will either
+ * get the new value, or the old value and all readers will eventually only see the new
+ * value after the old was removed.
+ *
+ * @param row
+ * @param family
+ * @param qualifier
+ * @param newValue
+ * @param now
+ * @return
+ */
+ public long updateColumnValue(byte[] row,
+ byte[] family,
+ byte[] qualifier,
+ long newValue,
+ long now) {
+ this.lock.readLock().lock();
+ try {
+ KeyValue firstKv = KeyValue.createFirstOnRow(
+ row, family, qualifier);
+ // create a new KeyValue with 'now' and a 0 memstoreTS == immediately visible
+ KeyValue newKv;
+ // Is there a KeyValue in 'snapshot' with the same TS? If so, upgrade the timestamp a bit.
+ SortedSet<KeyValue> snSs = snapshot.tailSet(firstKv);
+ if (!snSs.isEmpty()) {
+ KeyValue snKv = snSs.first();
+ // is there a matching KV in the snapshot?
+ if (snKv.matchingRow(firstKv) && snKv.matchingQualifier(firstKv)) {
+ if (snKv.getTimestamp() == now) {
+ // poop,
+ now += 1;
+ }
+ }
+ }
+
+ // logic here: the new ts MUST be at least 'now'. But it could be larger if necessary.
+ // But the timestamp should also be max(now, mostRecentTsInMemstore)
+
+ // so we cant add the new KV w/o knowing what's there already, but we also
+ // want to take this chance to delete some kvs. So two loops (sad)
+
+ SortedSet<KeyValue> ss = kvset.tailSet(firstKv);
+ Iterator<KeyValue> it = ss.iterator();
+ while ( it.hasNext() ) {
+ KeyValue kv = it.next();
+
+ // if this isnt the row we are interested in, then bail:
+ if (!firstKv.matchingRow(kv)) {
+ break; // rows dont match, bail.
+ }
+
+ // if the qualifier matches and it's a put, just RM it out of the kvset.
+ if (firstKv.matchingQualifier(kv)) {
+ // to be extra safe we only remove Puts that have a memstoreTS==0
+ if (kv.getType() == KeyValue.Type.Put.getCode()) {
+ now = Math.max(now, kv.getTimestamp());
+ }
+ }
+ }
+
+
+ // add the new value now. this might have the same TS as an existing KV, thus confusing
+ // readers slightly for a MOMENT until we erase the old one (and thus old value).
+ newKv = new KeyValue(row, family, qualifier,
+ now,
+ Bytes.toBytes(newValue));
+ long addedSize = add(newKv);
+
+ // remove extra versions.
+ ss = kvset.tailSet(firstKv);
+ it = ss.iterator();
+ while ( it.hasNext() ) {
+ KeyValue kv = it.next();
+
+ if (kv == newKv) {
+ // ignore the one i just put in (heh)
+ continue;
+ }
+
+ // if this isnt the row we are interested in, then bail:
+ if (!firstKv.matchingRow(kv)) {
+ break; // rows dont match, bail.
+ }
+
+ // if the qualifier matches and it's a put, just RM it out of the kvset.
+ if (firstKv.matchingQualifier(kv)) {
+ // to be extra safe we only remove Puts that have a memstoreTS==0
+ if (kv.getType() == KeyValue.Type.Put.getCode()) {
+ // false means there was a change, so give us the size.
+ addedSize -= heapSizeChange(kv, false);
+
+ it.remove();
+ }
+ }
+ }
+
+ return addedSize;
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ }
+
/*
* Immutable data structure to hold member found in set and the set it was
* found in. Include set because it is carrying context.
@@ -390,104 +501,20 @@ public class MemStore implements HeapSiz
}
}
- //
- // HBASE-880/1249/1304
- //
-
- /**
- * Perform a single-row Get on the and snapshot, placing results
- * into the specified KV list.
- * <p>
- * This will return true if it is determined that the query is complete
- * and it is not necessary to check any storefiles after this.
- * <p>
- * Otherwise, it will return false and you should continue on.
- * @param matcher Column matcher
- * @param result List to add results to
- * @return true if done with store (early-out), false if not
- */
- public boolean get(QueryMatcher matcher, List<KeyValue> result) {
- this.lock.readLock().lock();
- try {
- if(internalGet(this.kvset, matcher, result) || matcher.isDone()) {
- return true;
- }
- matcher.update();
- return internalGet(this.snapshot, matcher, result) || matcher.isDone();
- } finally {
- this.lock.readLock().unlock();
- }
- }
-
/**
- * Gets from either the memstore or the snapshop, and returns a code
- * to let you know which is which.
- *
- * @param matcher query matcher
- * @param result puts results here
- * @return 1 == memstore, 2 == snapshot, 0 == none
+ * Check if this memstore may contain the required keys
+ * @param scan
+ * @return False if the key definitely does not exist in this Memstore
*/
- int getWithCode(QueryMatcher matcher, List<KeyValue> result) {
- this.lock.readLock().lock();
- try {
- boolean fromMemstore = internalGet(this.kvset, matcher, result);
- if (fromMemstore || matcher.isDone())
- return 1;
-
- matcher.update();
- boolean fromSnapshot = internalGet(this.snapshot, matcher, result);
- if (fromSnapshot || matcher.isDone())
- return 2;
-
- return 0;
- } finally {
- this.lock.readLock().unlock();
- }
+ public boolean shouldSeek(Scan scan) {
+ return timeRangeTracker.includesTimeRange(scan.getTimeRange()) ||
+ snapshotTimeRangeTracker.includesTimeRange(scan.getTimeRange());
}
- /**
- * Small utility functions for use by Store.incrementColumnValue
- * _only_ under the threat of pain and everlasting race conditions.
- */
- void readLockLock() {
- this.lock.readLock().lock();
- }
- void readLockUnlock() {
- this.lock.readLock().unlock();
+ public TimeRangeTracker getSnapshotTimeRangeTracker() {
+ return this.snapshotTimeRangeTracker;
}
- /**
- *
- * @param set memstore or snapshot
- * @param matcher query matcher
- * @param result list to add results to
- * @return true if done with store (early-out), false if not
- */
- boolean internalGet(final NavigableSet<KeyValue> set,
- final QueryMatcher matcher, final List<KeyValue> result) {
- if(set.isEmpty()) return false;
- // Seek to startKey
- SortedSet<KeyValue> tail = set.tailSet(matcher.getStartKey());
- for (KeyValue kv : tail) {
- QueryMatcher.MatchCode res = matcher.match(kv);
- switch(res) {
- case INCLUDE:
- result.add(kv);
- break;
- case SKIP:
- break;
- case NEXT:
- return false;
- case DONE:
- return true;
- default:
- throw new RuntimeException("Unexpected " + res);
- }
- }
- return false;
- }
-
-
/*
* MemStoreScanner implements the KeyValueScanner.
* It lets the caller scan the contents of a memstore -- both current
@@ -520,7 +547,7 @@ public class MemStore implements HeapSiz
StoreScanner level with coordination with MemStoreScanner.
*/
-
+
MemStoreScanner() {
super();
@@ -531,7 +558,7 @@ public class MemStore implements HeapSiz
KeyValue ret = null;
long readPoint = ReadWriteConsistencyControl.getThreadReadPoint();
//DebugPrint.println( " MS@" + hashCode() + ": threadpoint = " + readPoint);
-
+
while (ret == null && it.hasNext()) {
KeyValue v = it.next();
if (v.getMemstoreTS() <= readPoint) {
@@ -566,13 +593,27 @@ public class MemStore implements HeapSiz
//DebugPrint.println( " MS@" + hashCode() + " snapshot seek: " + snapshotNextRow + " with size = " +
// snapshot.size() + " threadread = " + readPoint);
-
+
KeyValue lowest = getLowest();
// has data := (lowest != null)
return lowest != null;
}
+ @Override
+ public boolean reseek(KeyValue key) {
+ while (kvsetNextRow != null &&
+ comparator.compare(kvsetNextRow, key) < 0) {
+ kvsetNextRow = getNext(kvsetIt);
+ }
+
+ while (snapshotNextRow != null &&
+ comparator.compare(snapshotNextRow, key) < 0) {
+ snapshotNextRow = getNext(snapshotIt);
+ }
+ return (kvsetNextRow != null || snapshotNextRow != null);
+ }
+
public synchronized KeyValue peek() {
//DebugPrint.println(" MS@" + hashCode() + " peek = " + getLowest());
return getLowest();
@@ -630,8 +671,8 @@ public class MemStore implements HeapSiz
}
public final static long FIXED_OVERHEAD = ClassSize.align(
- ClassSize.OBJECT + (7 * ClassSize.REFERENCE));
-
+ ClassSize.OBJECT + (9 * ClassSize.REFERENCE));
+
public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG +
ClassSize.COPYONWRITE_ARRAYSET + ClassSize.COPYONWRITE_ARRAYLIST +
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java Fri Aug 27 05:01:02 2010
@@ -22,7 +22,6 @@ package org.apache.hadoop.hbase.regionse
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.io.hfile.HFile;
import java.io.IOException;
import java.util.List;
@@ -73,6 +72,10 @@ public class MinorCompactingStoreScanner
throw new UnsupportedOperationException("Can't seek a MinorCompactingStoreScanner");
}
+ public boolean reseek(KeyValue key) {
+ return seek(key);
+ }
+
/**
* High performance merge scan.
* @param writer
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java Fri Aug 27 05:01:02 2010
@@ -41,18 +41,39 @@ public class ReadWriteConsistencyControl
private static final ThreadLocal<Long> perThreadReadPoint =
new ThreadLocal<Long>();
+ /**
+ * Get this thread's read point. Used primarily by the memstore scanner to
+ * know which values to skip (ie: have not been completed/committed to
+ * memstore).
+ */
public static long getThreadReadPoint() {
return perThreadReadPoint.get();
}
+ /**
+ * Set the thread read point to the given value. The thread RWCC
+ * is used by the Memstore scanner so it knows which values to skip.
+ * Give it a value of 0 if you want everything.
+ */
public static void setThreadReadPoint(long readPoint) {
perThreadReadPoint.set(readPoint);
}
+ /**
+ * Set the thread RWCC read point to whatever the current read point is in
+ * this particular instance of RWCC. Returns the new thread read point value.
+ */
public static long resetThreadReadPoint(ReadWriteConsistencyControl rwcc) {
perThreadReadPoint.set(rwcc.memstoreReadPoint());
return getThreadReadPoint();
}
+
+ /**
+ * Set the thread RWCC read point to 0 (include everything).
+ */
+ public static void resetThreadReadPoint() {
+ perThreadReadPoint.set(0L);
+ }
public WriteEntry beginMemstoreInsert() {
synchronized (writeQueue) {