You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2010/08/27 07:01:07 UTC

svn commit: r990018 [3/10] - in /hbase/branches/0.90_master_rewrite: ./ bin/ bin/replication/ src/assembly/ src/docbkx/ src/main/java/org/apache/hadoop/hbase/ src/main/java/org/apache/hadoop/hbase/client/ src/main/java/org/apache/hadoop/hbase/filter/ s...

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Fri Aug 27 05:01:02 2010
@@ -26,7 +26,6 @@ import java.util.AbstractList;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -35,7 +34,6 @@ import java.util.Random;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -68,7 +66,6 @@ import org.apache.hadoop.hbase.client.Sc
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
 import org.apache.hadoop.hbase.io.HeapSize;
-import org.apache.hadoop.hbase.io.Reference.Range;
 import org.apache.hadoop.hbase.io.hfile.BlockCache;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
@@ -124,8 +121,8 @@ import com.google.common.collect.Lists;
  */
 public class HRegion implements HeapSize { // , Writable{
   public static final Log LOG = LogFactory.getLog(HRegion.class);
-  static final String SPLITDIR = "splits";
   static final String MERGEDIR = "merges";
+
   final AtomicBoolean closed = new AtomicBoolean(false);
   /* Closing can take some time; use the closing flag if there is stuff we don't
    * want to do while in closing state; e.g. like offer this region up to the
@@ -218,15 +215,12 @@ public class HRegion implements HeapSize
   private final long blockingMemStoreSize;
   final long threadWakeFrequency;
   // Used to guard splits and closes
-  private final ReentrantReadWriteLock splitsAndClosesLock =
-    new ReentrantReadWriteLock();
-  private final ReentrantReadWriteLock newScannerLock =
+  final ReentrantReadWriteLock lock =
     new ReentrantReadWriteLock();
 
   // Stop updates lock
   private final ReentrantReadWriteLock updatesLock =
     new ReentrantReadWriteLock();
-  private final Object splitLock = new Object();
   private boolean splitRequest;
 
   private final ReadWriteConsistencyControl rwcc =
@@ -288,7 +282,7 @@ public class HRegion implements HeapSize
     this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY,
         10 * 1000);
     String encodedNameStr = this.regionInfo.getEncodedName();
-    this.regiondir = new Path(tableDir, encodedNameStr);
+    this.regiondir = getRegionDir(this.tableDir, encodedNameStr);
     if (LOG.isDebugEnabled()) {
       // Write out region name as string and its encoded name.
       LOG.debug("Creating region " + this);
@@ -321,15 +315,18 @@ public class HRegion implements HeapSize
    */
   public long initialize(final Progressable reporter)
   throws IOException {
+    // A region can be reopened if failed a split; reset flags
+    this.closing.set(false);
+    this.closed.set(false);
+
     // Write HRI to a file in case we need to recover .META.
     checkRegioninfoOnFilesystem();
 
     // Remove temporary data left over from old regions
     cleanupTmpDir();
-    
-    // Load in all the HStores.  Get min and max seqids across all families.
+
+    // Load in all the HStores.  Get maximum seqid.
     long maxSeqId = -1;
-    long minSeqId = Integer.MAX_VALUE;
     for (HColumnDescriptor c : this.regionInfo.getTableDesc().getFamilies()) {
       Store store = instantiateHStore(this.tableDir, c);
       this.stores.put(c.getName(), store);
@@ -337,17 +334,14 @@ public class HRegion implements HeapSize
       if (storeSeqId > maxSeqId) {
         maxSeqId = storeSeqId;
       }
-      if (minSeqId > storeSeqId) {
-        minSeqId = storeSeqId;
-      }
     }
     // Recover any edits if available.
-    long seqid = replayRecoveredEditsIfAny(this.regiondir, minSeqId, reporter);
+    maxSeqId = replayRecoveredEditsIfAny(this.regiondir, maxSeqId, reporter);
 
     // Get rid of any splits or merges that were lost in-progress.  Clean out
     // these directories here on open.  We may be opening a region that was
     // being split but we crashed in the middle of it all.
-    FSUtils.deleteDirectory(this.fs, new Path(regiondir, SPLITDIR));
+    SplitTransaction.cleanupAnySplitDetritus(this);
     FSUtils.deleteDirectory(this.fs, new Path(regiondir, MERGEDIR));
 
     // See if region is meant to run read-only.
@@ -359,7 +353,7 @@ public class HRegion implements HeapSize
     this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
     // Use maximum of log sequenceid or that which was found in stores
     // (particularly if no recovered edits, seqid will be -1).
-    long nextSeqid = Math.max(seqid, maxSeqId) + 1;
+    long nextSeqid = maxSeqId + 1;
     LOG.info("Onlined " + this.toString() + "; next sequenceid=" + nextSeqid);
     return nextSeqid;
   }
@@ -370,7 +364,7 @@ public class HRegion implements HeapSize
    * @param initialFiles
    * @throws IOException
    */
-  private static void moveInitialFilesIntoPlace(final FileSystem fs,
+  static void moveInitialFilesIntoPlace(final FileSystem fs,
     final Path initialFiles, final Path regiondir)
   throws IOException {
     if (initialFiles != null && fs.exists(initialFiles)) {
@@ -469,70 +463,61 @@ public class HRegion implements HeapSize
    *
    * @throws IOException e
    */
-  public List<StoreFile> close(final boolean abort) throws IOException {
+  public List<StoreFile> close(final boolean abort)
+  throws IOException {
     if (isClosed()) {
-      LOG.warn("region " + this + " already closed");
+      LOG.warn("Region " + this + " already closed");
       return null;
     }
-    synchronized (splitLock) {
-      boolean wasFlushing = false;
-      synchronized (writestate) {
-        // Disable compacting and flushing by background threads for this
-        // region.
-        writestate.writesEnabled = false;
-        wasFlushing = writestate.flushing;
-        LOG.debug("Closing " + this + ": disabling compactions & flushes");
-        while (writestate.compacting || writestate.flushing) {
-          LOG.debug("waiting for" +
-              (writestate.compacting ? " compaction" : "") +
-              (writestate.flushing ?
-                  (writestate.compacting ? "," : "") + " cache flush" :
-                    "") + " to complete for region " + this);
-          try {
-            writestate.wait();
-          } catch (InterruptedException iex) {
-            // continue
-          }
+    boolean wasFlushing = false;
+    synchronized (writestate) {
+      // Disable compacting and flushing by background threads for this
+      // region.
+      writestate.writesEnabled = false;
+      wasFlushing = writestate.flushing;
+      LOG.debug("Closing " + this + ": disabling compactions & flushes");
+      while (writestate.compacting || writestate.flushing) {
+        LOG.debug("waiting for" +
+          (writestate.compacting ? " compaction" : "") +
+          (writestate.flushing ?
+            (writestate.compacting ? "," : "") + " cache flush" :
+              "") + " to complete for region " + this);
+        try {
+          writestate.wait();
+        } catch (InterruptedException iex) {
+          // continue
         }
       }
-      // If we were not just flushing, is it worth doing a preflush...one
-      // that will clear out of the bulk of the memstore before we put up
-      // the close flag?
-      if (!abort && !wasFlushing && worthPreFlushing()) {
-        LOG.info("Running close preflush of " + this.getRegionNameAsString());
+    }
+    // If we were not just flushing, is it worth doing a preflush...one
+    // that will clear out of the bulk of the memstore before we put up
+    // the close flag?
+    if (!abort && !wasFlushing && worthPreFlushing()) {
+      LOG.info("Running close preflush of " + this.getRegionNameAsString());
+      internalFlushcache();
+    }
+    this.closing.set(true);
+    lock.writeLock().lock();
+    try {
+      if (this.isClosed()) {
+        // SplitTransaction handles the null
+        return null;
+      }
+      LOG.debug("Updates disabled for region " + this);
+      // Don't flush the cache if we are aborting
+      if (!abort) {
         internalFlushcache();
       }
-      newScannerLock.writeLock().lock();
-      this.closing.set(true);
-      try {
-        splitsAndClosesLock.writeLock().lock();
-        LOG.debug("Updates disabled for region, no outstanding scanners on " +
-          this);
-        try {
-          // Write lock means no more row locks can be given out.  Wait on
-          // outstanding row locks to come in before we close so we do not drop
-          // outstanding updates.
-          waitOnRowLocks();
-          LOG.debug("No more row locks outstanding on region " + this);
-
-          // Don't flush the cache if we are aborting
-          if (!abort) {
-            internalFlushcache();
-          }
 
-          List<StoreFile> result = new ArrayList<StoreFile>();
-          for (Store store: stores.values()) {
-            result.addAll(store.close());
-          }
-          this.closed.set(true);
-          LOG.info("Closed " + this);
-          return result;
-        } finally {
-          splitsAndClosesLock.writeLock().unlock();
-        }
-      } finally {
-        newScannerLock.writeLock().unlock();
-      }
+      List<StoreFile> result = new ArrayList<StoreFile>();
+      for (Store store : stores.values()) {
+        result.addAll(store.close());
+      }
+      this.closed.set(true);
+      LOG.info("Closed " + this);
+      return result;
+    } finally {
+      lock.writeLock().unlock();
     }
   }
 
@@ -593,6 +578,17 @@ public class HRegion implements HeapSize
     return this.regiondir;
   }
 
+  /**
+   * Computes the Path of the HRegion
+   *
+   * @param tabledir qualified path for table
+   * @param name ENCODED region name
+   * @return Path of HRegion directory
+   */
+  public static Path getRegionDir(final Path tabledir, final String name) {
+    return new Path(tabledir, name);
+  }
+
   /** @return FileSystem being used by this region */
   public FileSystem getFilesystem() {
     return this.fs;
@@ -623,113 +619,6 @@ public class HRegion implements HeapSize
   }
 
   /*
-   * Split the HRegion to create two brand-new ones.  This also closes
-   * current HRegion.  Split should be fast since we don't rewrite store files
-   * but instead create new 'reference' store files that read off the top and
-   * bottom ranges of parent store files.
-   * @param splitRow row on which to split region
-   * @return two brand-new HRegions or null if a split is not needed
-   * @throws IOException
-   */
-  HRegion [] splitRegion(final byte [] splitRow) throws IOException {
-    prepareToSplit();
-    synchronized (splitLock) {
-      if (closed.get()) {
-        return null;
-      }
-      // Add start/end key checking: hbase-428.
-      byte [] startKey = this.regionInfo.getStartKey();
-      byte [] endKey = this.regionInfo.getEndKey();
-      if (this.comparator.matchingRows(startKey, 0, startKey.length,
-          splitRow, 0, splitRow.length)) {
-        LOG.debug("Startkey and midkey are same, not splitting");
-        return null;
-      }
-      if (this.comparator.matchingRows(splitRow, 0, splitRow.length,
-          endKey, 0, endKey.length)) {
-        LOG.debug("Endkey and midkey are same, not splitting");
-        return null;
-      }
-      LOG.info("Starting split of region " + this);
-      Path splits = new Path(this.regiondir, SPLITDIR);
-      if(!this.fs.exists(splits)) {
-        this.fs.mkdirs(splits);
-      }
-      // Calculate regionid to use.  Can't be less than that of parent else
-      // it'll insert into wrong location over in .META. table: HBASE-710.
-      long rid = EnvironmentEdgeManager.currentTimeMillis();
-      if (rid < this.regionInfo.getRegionId()) {
-        LOG.warn("Clock skew; parent regions id is " +
-          this.regionInfo.getRegionId() + " but current time here is " + rid);
-        rid = this.regionInfo.getRegionId() + 1;
-      }
-      HRegionInfo regionAInfo = new HRegionInfo(this.regionInfo.getTableDesc(),
-        startKey, splitRow, false, rid);
-      Path dirA = getSplitDirForDaughter(splits, regionAInfo);
-      HRegionInfo regionBInfo = new HRegionInfo(this.regionInfo.getTableDesc(),
-        splitRow, endKey, false, rid);
-      Path dirB = getSplitDirForDaughter(splits, regionBInfo);
-
-      // Now close the HRegion.  Close returns all store files or null if not
-      // supposed to close (? What to do in this case? Implement abort of close?)
-      // Close also does wait on outstanding rows and calls a flush just-in-case.
-      List<StoreFile> hstoreFilesToSplit = close(false);
-      if (hstoreFilesToSplit == null) {
-        LOG.warn("Close came back null (Implement abort of close?)");
-        throw new RuntimeException("close returned empty vector of HStoreFiles");
-      }
-
-      // Split each store file.
-      for(StoreFile h: hstoreFilesToSplit) {
-        StoreFile.split(fs,
-          Store.getStoreHomedir(splits, regionAInfo.getEncodedName(),
-            h.getFamily()),
-          h, splitRow, Range.bottom);
-        StoreFile.split(fs,
-          Store.getStoreHomedir(splits, regionBInfo.getEncodedName(),
-            h.getFamily()),
-          h, splitRow, Range.top);
-      }
-
-      // Create a region instance and then move the splits into place under
-      // regionA and regionB.
-      HRegion regionA =
-        HRegion.newHRegion(tableDir, log, fs, conf, regionAInfo, null);
-      moveInitialFilesIntoPlace(this.fs, dirA, regionA.getRegionDir());
-      HRegion regionB =
-        HRegion.newHRegion(tableDir, log, fs, conf, regionBInfo, null);
-      moveInitialFilesIntoPlace(this.fs, dirB, regionB.getRegionDir());
-
-      return new HRegion [] {regionA, regionB};
-    }
-  }
-
-  /*
-   * Get the daughter directories in the splits dir.  The splits dir is under
-   * the parent regions' directory.
-   * @param splits
-   * @param hri
-   * @return Path to split dir.
-   * @throws IOException
-   */
-  private Path getSplitDirForDaughter(final Path splits, final HRegionInfo hri)
-  throws IOException {
-    Path d =
-      new Path(splits, hri.getEncodedName());
-    if (fs.exists(d)) {
-      // This should never happen; the splits dir will be newly made when we
-      // come in here.  Even if we crashed midway through a split, the reopen
-      // of the parent region clears out the dir in its initialize method.
-      throw new IOException("Cannot split; target file collision at " + d);
-    }
-    return d;
-  }
-
-  protected void prepareToSplit() {
-    // nothing
-  }
-
-  /*
    * Do preparation for pending compaction.
    * @throws IOException
    */
@@ -796,12 +685,16 @@ public class HRegion implements HeapSize
    */
   byte [] compactStores(final boolean majorCompaction)
   throws IOException {
-    if (this.closing.get() || this.closed.get()) {
-      LOG.debug("Skipping compaction on " + this + " because closing/closed");
+    if (this.closing.get()) {
+      LOG.debug("Skipping compaction on " + this + " because closing");
       return null;
     }
-    splitsAndClosesLock.readLock().lock();
+    lock.readLock().lock();
     try {
+      if (this.closed.get()) {
+        LOG.debug("Skipping compaction on " + this + " because closed");
+        return null;
+      }
       byte [] splitRow = null;
       if (this.closed.get()) {
         return splitRow;
@@ -840,7 +733,7 @@ public class HRegion implements HeapSize
       }
       return splitRow;
     } finally {
-      splitsAndClosesLock.readLock().unlock();
+      lock.readLock().unlock();
     }
   }
 
@@ -865,41 +758,48 @@ public class HRegion implements HeapSize
    * because a Snapshot was not properly persisted.
    */
   public boolean flushcache() throws IOException {
-    if (this.closed.get()) {
+    // fail-fast instead of waiting on the lock
+    if (this.closing.get()) {
+      LOG.debug("Skipping flush on " + this + " because closing");
       return false;
     }
-    synchronized (writestate) {
-      if (!writestate.flushing && writestate.writesEnabled) {
-        this.writestate.flushing = true;
-      } else {
-        if(LOG.isDebugEnabled()) {
-          LOG.debug("NOT flushing memstore for region " + this +
-            ", flushing=" +
-              writestate.flushing + ", writesEnabled=" +
-              writestate.writesEnabled);
-        }
+    lock.readLock().lock();
+    try {
+      if (this.closed.get()) {
+        LOG.debug("Skipping flush on " + this + " because closed");
         return false;
       }
-    }
-    try {
-      // Prevent splits and closes
-      splitsAndClosesLock.readLock().lock();
       try {
+        synchronized (writestate) {
+          if (!writestate.flushing && writestate.writesEnabled) {
+            this.writestate.flushing = true;
+          } else {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("NOT flushing memstore for region " + this +
+                  ", flushing=" +
+                  writestate.flushing + ", writesEnabled=" +
+                  writestate.writesEnabled);
+            }
+            return false;
+          }
+        }
         return internalFlushcache();
       } finally {
-        splitsAndClosesLock.readLock().unlock();
+        synchronized (writestate) {
+          writestate.flushing = false;
+          this.writestate.flushRequested = false;
+          writestate.notifyAll();
+        }
       }
     } finally {
-      synchronized (writestate) {
-        writestate.flushing = false;
-        this.writestate.flushRequested = false;
-        writestate.notifyAll();
-      }
+      lock.readLock().unlock();
     }
   }
 
   /**
-   * Flushing the cache is a little tricky. We have a lot of updates in the
+   * Flush the memstore.
+   * 
+   * Flushing the memstore is a little tricky. We have a lot of updates in the
    * memstore, all of which have also been written to the log. We need to
    * write those updates in the memstore out to disk, while being able to
    * process reads/writes as much as possible during the flush operation. Also,
@@ -931,6 +831,19 @@ public class HRegion implements HeapSize
    * because a Snapshot was not properly persisted.
    */
   protected boolean internalFlushcache() throws IOException {
+    return internalFlushcache(this.log, -1);
+  }
+
+  /**
+   * @param wal Null if we're NOT to go via hlog/wal.
+   * @param myseqid The seqid to use if <code>wal</code> is null writing out
+   * flush file.
+   * @return true if the region needs compacting
+   * @throws IOException
+   * @see {@link #internalFlushcache()}
+   */
+  protected boolean internalFlushcache(final HLog wal, final long myseqid)
+  throws IOException {
     final long startTime = EnvironmentEdgeManager.currentTimeMillis();
     // Clear flush flag.
     // Record latest flush time
@@ -942,7 +855,8 @@ public class HRegion implements HeapSize
     if (LOG.isDebugEnabled()) {
       LOG.debug("Started memstore flush for region " + this +
         ". Current region memstore size " +
-          StringUtils.humanReadableInt(this.memstoreSize.get()));
+        StringUtils.humanReadableInt(this.memstoreSize.get()) +
+        ((wal != null)? "": "; wal is null, using passed sequenceid=" + myseqid));
     }
 
     // Stop updates while we snapshot the memstore of all stores. We only have
@@ -955,14 +869,14 @@ public class HRegion implements HeapSize
     long sequenceId = -1L;
     long completeSequenceId = -1L;
 
-    // we have to take a write lock during snapshot, or else a write could
+    // We have to take a write lock during snapshot, or else a write could
     // end up in both snapshot and memstore (makes it difficult to do atomic
     // rows then)
     this.updatesLock.writeLock().lock();
     final long currentMemStoreSize = this.memstoreSize.get();
     List<StoreFlusher> storeFlushers = new ArrayList<StoreFlusher>(stores.size());
     try {
-      sequenceId = log.startCacheFlush();
+      sequenceId = (wal == null)? myseqid: wal.startCacheFlush();
       completeSequenceId = this.getCompleteCacheFlushSequenceId(sequenceId);
 
       for (Store s : stores.values()) {
@@ -992,38 +906,14 @@ public class HRegion implements HeapSize
       for (StoreFlusher flusher : storeFlushers) {
         flusher.flushCache();
       }
-
-      Callable<Void> atomicWork = internalPreFlushcacheCommit();
-
-      LOG.debug("Caches flushed, doing commit now (which includes update scanners)");
-
-      /**
-       * Switch between memstore(snapshot) and the new store file
-       */
-      if (atomicWork != null) {
-        LOG.debug("internalPreFlushcacheCommit gives us work to do, acquiring newScannerLock");
-        newScannerLock.writeLock().lock();
-      }
-
-      try {
-	if (atomicWork != null) {
-	  atomicWork.call();
-	}
-
-        // Switch snapshot (in memstore) -> new hfile (thus causing
-        // all the store scanners to reset/reseek).
-        for (StoreFlusher flusher : storeFlushers) {
-          boolean needsCompaction = flusher.commit();
-          if (needsCompaction) {
-            compactionRequested = true;
-          }
-        }
-      } finally {
-        if (atomicWork != null) {
-          newScannerLock.writeLock().unlock();
+      // Switch snapshot (in memstore) -> new hfile (thus causing
+      // all the store scanners to reset/reseek).
+      for (StoreFlusher flusher : storeFlushers) {
+        boolean needsCompaction = flusher.commit();
+        if (needsCompaction) {
+          compactionRequested = true;
         }
       }
-
       storeFlushers.clear();
 
       // Set down the memstore size by amount of flush.
@@ -1035,7 +925,7 @@ public class HRegion implements HeapSize
       // We used to only catch IOEs but its possible that we'd get other
       // exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch
       // all and sundry.
-      this.log.abortCacheFlush();
+      if (wal != null) wal.abortCacheFlush();
       DroppedSnapshotException dse = new DroppedSnapshotException("region: " +
           Bytes.toStringBinary(getRegionName()));
       dse.initCause(t);
@@ -1049,9 +939,11 @@ public class HRegion implements HeapSize
     //     This tells future readers that the HStores were emitted correctly,
     //     and that all updates to the log for this regionName that have lower
     //     log-sequence-ids can be safely ignored.
-    this.log.completeCacheFlush(getRegionName(),
+    if (wal != null) {
+      wal.completeCacheFlush(getRegionName(),
         regionInfo.getTableDesc().getName(), completeSequenceId,
         this.getRegionInfo().isMetaRegion());
+    }
 
     // C. Finally notify anyone waiting on memstore to clear:
     // e.g. checkResources().
@@ -1063,27 +955,13 @@ public class HRegion implements HeapSize
       long now = EnvironmentEdgeManager.currentTimeMillis();
       LOG.info("Finished memstore flush of ~" +
         StringUtils.humanReadableInt(currentMemStoreSize) + " for region " +
-        this + " in " + (now - startTime) + "ms, sequence id=" + sequenceId +
-        ", compaction requested=" + compactionRequested);
+        this + " in " + (now - startTime) + "ms, sequenceid=" + sequenceId +
+        ", compaction requested=" + compactionRequested +
+        ((wal == null)? "; wal=null": ""));
     }
     return compactionRequested;
   }
 
-
-   /**
-    * A hook for sub classed wishing to perform operations prior to the cache
-    * flush commit stage.
-    *
-    * If a subclass wishes that an atomic update of their work and the
-    * flush commit stage happens, they should return a callable. The new scanner
-    * lock will be acquired and released.
-
-    * @throws java.io.IOException allow children to throw exception
-    */
-   protected Callable<Void> internalPreFlushcacheCommit() throws IOException {
-     return null;
-   }
-
    /**
    * Get the sequence number to be associated with this cache flush. Used by
    * TransactionalRegion to not complete pending transactions.
@@ -1129,7 +1007,7 @@ public class HRegion implements HeapSize
     // closest key is across all column families, since the data may be sparse
     KeyValue key = null;
     checkRow(row);
-    splitsAndClosesLock.readLock().lock();
+    startRegionOperation();
     try {
       Store store = getStore(family);
       KeyValue kv = new KeyValue(row, HConstants.LATEST_TIMESTAMP);
@@ -1142,7 +1020,7 @@ public class HRegion implements HeapSize
       get.addFamily(family);
       return get(get, null);
     } finally {
-      splitsAndClosesLock.readLock().unlock();
+      closeRegionOperation();
     }
   }
 
@@ -1162,11 +1040,8 @@ public class HRegion implements HeapSize
   }
 
   protected InternalScanner getScanner(Scan scan, List<KeyValueScanner> additionalScanners) throws IOException {
-    newScannerLock.readLock().lock();
+    startRegionOperation();
     try {
-      if (this.closing.get() || this.closed.get()) {
-        throw new NotServingRegionException("Region " + this + " closed");
-      }
       // Verify families are all valid
       if(scan.hasFamilies()) {
         for(byte [] family : scan.getFamilyMap().keySet()) {
@@ -1180,7 +1055,7 @@ public class HRegion implements HeapSize
       return instantiateInternalScanner(scan, additionalScanners);
 
     } finally {
-      newScannerLock.readLock().unlock();
+      closeRegionOperation();
     }
   }
 
@@ -1222,7 +1097,7 @@ public class HRegion implements HeapSize
     checkReadOnly();
     checkResources();
     Integer lid = null;
-    splitsAndClosesLock.readLock().lock();
+    startRegionOperation();
     try {
       byte [] row = delete.getRow();
       // If we did not pass an existing row lock, obtain a new one
@@ -1234,7 +1109,7 @@ public class HRegion implements HeapSize
 
     } finally {
       if(lockid == null) releaseRowLock(lid);
-      splitsAndClosesLock.readLock().unlock();
+      closeRegionOperation();
     }
   }
 
@@ -1367,8 +1242,7 @@ public class HRegion implements HeapSize
     // read lock, resources may run out.  For now, the thought is that this
     // will be extremely rare; we'll deal with it when it happens.
     checkResources();
-    splitsAndClosesLock.readLock().lock();
-
+    startRegionOperation();
     try {
       // We obtain a per-row lock, so other clients will block while one client
       // performs an update. The read lock is released by the client calling
@@ -1386,7 +1260,7 @@ public class HRegion implements HeapSize
         if(lockid == null) releaseRowLock(lid);
       }
     } finally {
-      splitsAndClosesLock.readLock().unlock();
+      closeRegionOperation();
     }
   }
 
@@ -1439,12 +1313,12 @@ public class HRegion implements HeapSize
       checkResources();
 
       long newSize;
-      splitsAndClosesLock.readLock().lock();
+      startRegionOperation();
       try {
         long addedSize = doMiniBatchPut(batchOp);
         newSize = memstoreSize.addAndGet(addedSize);
       } finally {
-        splitsAndClosesLock.readLock().unlock();
+        closeRegionOperation();
       }
       if (isFlushSize(newSize)) {
         requestFlush();
@@ -1582,7 +1456,7 @@ public class HRegion implements HeapSize
     if (!isPut && !(w instanceof Delete))
       throw new IOException("Action must be Put or Delete");
 
-    splitsAndClosesLock.readLock().lock();
+    startRegionOperation();
     try {
       RowLock lock = isPut ? ((Put)w).getRowLock() : ((Delete)w).getRowLock();
       Get get = new Get(row, lock);
@@ -1596,7 +1470,8 @@ public class HRegion implements HeapSize
         result = get(get);
 
         boolean matches = false;
-        if (result.size() == 0 && expectedValue.length == 0) {
+        if (result.size() == 0 &&
+            (expectedValue == null || expectedValue.length == 0)) {
           matches = true;
         } else if (result.size() == 1) {
           //Compare the expected value with the actual value
@@ -1620,7 +1495,7 @@ public class HRegion implements HeapSize
         if(lockId == null) releaseRowLock(lid);
       }
     } finally {
-      splitsAndClosesLock.readLock().unlock();
+      closeRegionOperation();
     }
   }
 
@@ -1828,46 +1703,75 @@ public class HRegion implements HeapSize
    * Read the edits log put under this region by wal log splitting process.  Put
    * the recovered edits back up into this region.
    *
-   * We can ignore any log message that has a sequence ID that's equal to or
+   * <p>We can ignore any log message that has a sequence ID that's equal to or
    * lower than minSeqId.  (Because we know such log messages are already
    * reflected in the HFiles.)
+   * 
+   * <p>While this is running we are putting pressure on memory yet we are
+   * outside of our usual accounting because we are not yet an onlined region
+   * (this stuff is being run as part of Region initialization).  This means
+   * that if we're up against global memory limits, we'll not be flagged to flush
+   * because we are not online. We can't be flushed by usual mechanisms anyways;
+   * we're not yet online so our relative sequenceids are not yet aligned with
+   * HLog sequenceids -- not till we come up online, post processing of split
+   * edits.
+   * 
+   * <p>But to help relieve memory pressure, at least manage our own heap size
+   * flushing if are in excess of per-region limits.  Flushing, though, we have
+   * to be careful and avoid using the regionserver/hlog sequenceid.  Its running
+   * on a different line to whats going on in here in this region context so if we
+   * crashed replaying these edits, but in the midst had a flush that used the
+   * regionserver log with a sequenceid in excess of whats going on in here
+   * in this region and with its split editlogs, then we could miss edits the
+   * next time we go to recover. So, we have to flush inline, using seqids that
+   * make sense in a this single region context only -- until we online.
+   * 
    * @param regiondir
-   * @param minSeqId Minimum sequenceid found in a store file.  Edits in log
-   * must be larger than this to be replayed.
+   * @param minSeqId Any edit found in split editlogs needs to be in excess of
+   * this minSeqId to be applied, else its skipped.
    * @param reporter
    * @return the sequence id of the last edit added to this region out of the
-   * recovered edits log, or -1 if no log recovered
+   * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
    * @throws UnsupportedEncodingException
    * @throws IOException
    */
   protected long replayRecoveredEditsIfAny(final Path regiondir,
       final long minSeqId, final Progressable reporter)
   throws UnsupportedEncodingException, IOException {
-    Path edits = new Path(regiondir, HLog.RECOVERED_EDITS);
-    if (edits == null || !this.fs.exists(edits)) return -1;
-    if (isZeroLengthThenDelete(this.fs, edits)) return -1;
-    long maxSeqIdInLog = -1;
-    try {
-      maxSeqIdInLog = replayRecoveredEdits(edits, minSeqId, reporter);
-      LOG.debug("Deleting recovered edits file: " + edits);
-      if (!this.fs.delete(edits, false)) {
-        LOG.error("Failed delete of " + edits);
-      }
-    } catch (IOException e) {
-      boolean skipErrors = conf.getBoolean("hbase.skip.errors", false);
-      if (skipErrors) {
-        Path moveAsideName = new Path(edits.getParent(), edits.getName() + "." +
-          System.currentTimeMillis());
-        LOG.error("hbase.skip.errors=true so continuing. Renamed " + edits +
-          " as " + moveAsideName, e);
-        if (!this.fs.rename(edits, moveAsideName)) {
-          LOG.error("hbase.skip.errors=true so continuing. Rename failed");
+    long seqid = minSeqId;
+    NavigableSet<Path> files = HLog.getSplitEditFilesSorted(this.fs, regiondir);
+    if (files == null || files.isEmpty()) return seqid;
+    for (Path edits: files) {
+      if (edits == null || !this.fs.exists(edits)) {
+        LOG.warn("Null or non-existent edits file: " + edits);
+        continue;
+      }
+      if (isZeroLengthThenDelete(this.fs, edits)) continue;
+      try {
+        seqid = replayRecoveredEdits(edits, seqid, reporter);
+      } catch (IOException e) {
+        boolean skipErrors = conf.getBoolean("hbase.skip.errors", false);
+        if (skipErrors) {
+          Path p = HLog.moveAsideBadEditsFile(fs, edits);
+          LOG.error("hbase.skip.errors=true so continuing. Renamed " + edits +
+            " as " + p, e);
+        } else {
+          throw e;
+        }
+      }
+    }
+    if (seqid > minSeqId) {
+      // Then we added some edits to memory. Flush and cleanup split edit files.
+      internalFlushcache(null, seqid);
+      for (Path file: files) {
+        if (!this.fs.delete(file, false)) {
+          LOG.error("Failed delete of " + file);
+        } else {
+          LOG.debug("Deleted recovered.edits file=" + file);
         }
-      } else {
-        throw e;
       }
     }
-    return maxSeqIdInLog;
+    return seqid;
   }
 
   /*
@@ -1876,12 +1780,13 @@ public class HRegion implements HeapSize
    * must be larger than this to be replayed.
    * @param reporter
    * @return the sequence id of the last edit added to this region out of the
-   * recovered edits log, or -1 if no log recovered
+   * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
    * @throws IOException
    */
   private long replayRecoveredEdits(final Path edits,
       final long minSeqId, final Progressable reporter)
     throws IOException {
+    LOG.info("Replaying edits from " + edits + "; minSequenceid=" + minSeqId);
     HLog.Reader reader = HLog.getReader(this.fs, edits, conf);
     try {
       return replayRecoveredEdits(reader, minSeqId, reporter);
@@ -1891,26 +1796,22 @@ public class HRegion implements HeapSize
   }
 
  /* @param reader Reader against file of recovered edits.
-  * @param minSeqId Minimum sequenceid found in a store file.  Edits in log
-  * must be larger than this to be replayed.
+  * @param minSeqId Any edit found in split editlogs needs to be in excess of
+  * this minSeqId to be applied, else its skipped.
   * @param reporter
   * @return the sequence id of the last edit added to this region out of the
-  * recovered edits log, or -1 if no log recovered
+  * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
   * @throws IOException
   */
   private long replayRecoveredEdits(final HLog.Reader reader,
     final long minSeqId, final Progressable reporter)
   throws IOException {
-    long currentEditSeqId = -1;
+    long currentEditSeqId = minSeqId;
     long firstSeqIdInLog = -1;
     long skippedEdits = 0;
     long editsCount = 0;
     HLog.Entry entry;
     Store store = null;
-    // Get map of family name to maximum sequence id.  Do it here up front
-    // because as we progress, the sequence id can change if we happen to flush
-    // The flush ups the seqid for the Store.  The new seqid can cause us skip edits.
-    Map<byte [], Long> familyToOriginalMaxSeqId = familyToMaxSeqId(this.stores);
     // How many edits to apply before we send a progress report.
     int interval = this.conf.getInt("hbase.hstore.report.interval.edits", 2000);
     while ((entry = reader.next()) != null) {
@@ -1920,12 +1821,13 @@ public class HRegion implements HeapSize
         firstSeqIdInLog = key.getLogSeqNum();
       }
       // Now, figure if we should skip this edit.
-      currentEditSeqId = Math.max(currentEditSeqId, key.getLogSeqNum());
-      if (key.getLogSeqNum() <= minSeqId) {
+      if (key.getLogSeqNum() <= currentEditSeqId) {
         skippedEdits++;
         continue;
       }
-      for (KeyValue kv : val.getKeyValues()) {
+      currentEditSeqId = key.getLogSeqNum();
+      boolean flush = false;
+      for (KeyValue kv: val.getKeyValues()) {
         // Check this edit is for me. Also, guard against writing the special
         // METACOLUMN info such as HBASE::CACHEFLUSH entries
         if (kv.matchingFamily(HLog.METAFAMILY) ||
@@ -1944,16 +1846,13 @@ public class HRegion implements HeapSize
           skippedEdits++;
           continue;
         }
-        // The edits' id has to be in excess of the original max seqid of the
-        // targeted store.
-        long storeMaxSeqId = familyToOriginalMaxSeqId.get(store.getFamily().getName());
-        if (currentEditSeqId < storeMaxSeqId) {
-          skippedEdits++;
-          continue;
-        }
-        restoreEdit(kv);
+        // Once we are over the limit, restoreEdit will keep returning true to
+        // flush -- but don't flush until we've played all the kvs that make up
+        // the WALEdit.
+        flush = restoreEdit(store, kv);
         editsCount++;
      }
+     if (flush) internalFlushcache(null, currentEditSeqId);
 
       // Every 'interval' edits, tell the reporter we're making progress.
       // Have seen 60k edits taking 3minutes to complete.
@@ -1963,40 +1862,20 @@ public class HRegion implements HeapSize
     }
     if (LOG.isDebugEnabled()) {
       LOG.debug("Applied " + editsCount + ", skipped " + skippedEdits +
-        ", firstSeqIdInLog=" + firstSeqIdInLog +
-        ", maxSeqIdInLog=" + currentEditSeqId);
+        ", firstSequenceidInLog=" + firstSeqIdInLog +
+        ", maxSequenceidInLog=" + currentEditSeqId);
     }
     return currentEditSeqId;
   }
 
-  /*
-   * @param stores
-   * @return Map of family name to maximum sequenceid.
-   */
-  private Map<byte [], Long> familyToMaxSeqId(final Map<byte [], Store> stores) {
-    Map<byte [], Long> map = new TreeMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
-    for (Map.Entry<byte [], Store> e: stores.entrySet()) {
-      map.put(e.getKey(), e.getValue().getMaxSequenceId());
-    }
-    return map;
-  }
-
-  /*
-   * @param kv Apply this value to this region.
-   * @throws IOException
+  /**
+   * Used by tests
+   * @param s Store to add edit too.
+   * @param kv KeyValue to add.
+   * @return True if we should flush.
    */
-  // This method is protected so can be called from tests.
-  protected void restoreEdit(final KeyValue kv) throws IOException {
-    // This is really expensive to do per edit.  Loads of object creation.
-    // TODO: Optimization.  Apply edits batched by family.
-    Map<byte [], List<KeyValue>> map =
-      new TreeMap<byte [], List<KeyValue>>(Bytes.BYTES_COMPARATOR);
-    map.put(kv.getFamily(), Collections.singletonList(kv));
-    if (kv.isDelete()) {
-      delete(map, true);
-    } else {
-      put(map, true);
-    }
+  protected boolean restoreEdit(final Store s, final KeyValue kv) {
+    return isFlushSize(this.memstoreSize.addAndGet(s.add(kv)));
   }
 
   /*
@@ -2069,7 +1948,12 @@ public class HRegion implements HeapSize
    * @return The id of the held lock.
    */
   public Integer obtainRowLock(final byte [] row) throws IOException {
-    return internalObtainRowLock(row, true);
+    startRegionOperation();
+    try {
+      return internalObtainRowLock(row, true);
+    } finally {
+      closeRegionOperation();
+    }
   }
 
   /**
@@ -2079,7 +1963,12 @@ public class HRegion implements HeapSize
    * @see HRegion#obtainRowLock(byte[])
    */
   public Integer tryObtainRowLock(final byte[] row) throws IOException {
-    return internalObtainRowLock(row, false);
+    startRegionOperation();
+    try {
+      return internalObtainRowLock(row, false);
+    } finally {
+      closeRegionOperation();
+    }
   }
   
   /**
@@ -2091,11 +1980,8 @@ public class HRegion implements HeapSize
   private Integer internalObtainRowLock(final byte[] row, boolean waitForLock)
   throws IOException {
     checkRow(row);
-    splitsAndClosesLock.readLock().lock();
+    startRegionOperation();
     try {
-      if (this.closed.get()) {
-        throw new NotServingRegionException(this + " is closed");
-      }
       synchronized (lockedRows) {
         while (lockedRows.contains(row)) {
           if (!waitForLock) {
@@ -2129,7 +2015,7 @@ public class HRegion implements HeapSize
         return lockId;
       }
     } finally {
-      splitsAndClosesLock.readLock().unlock();
+      closeRegionOperation();
     }
   }
   
@@ -2193,24 +2079,9 @@ public class HRegion implements HeapSize
     return lid;
   }
 
-  private void waitOnRowLocks() {
-    synchronized (lockedRows) {
-      while (!this.lockedRows.isEmpty()) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Waiting on " + this.lockedRows.size() + " row locks");
-        }
-        try {
-          this.lockedRows.wait();
-        } catch (InterruptedException e) {
-          // Catch. Let while test determine loop-end.
-        }
-      }
-    }
-  }
-
   public void bulkLoadHFile(String hfilePath, byte[] familyName)
   throws IOException {
-    splitsAndClosesLock.readLock().lock();
+    startRegionOperation();
     try {
       Store store = getStore(familyName);
       if (store == null) {
@@ -2219,7 +2090,7 @@ public class HRegion implements HeapSize
       }
       store.bulkLoadHFile(hfilePath);
     } finally {
-      splitsAndClosesLock.readLock().unlock();
+      closeRegionOperation();
     }
 
   }
@@ -2312,24 +2183,24 @@ public class HRegion implements HeapSize
             "after we renewed it. Could be caused by a very slow scanner " +
             "or a lengthy garbage collection");
       }
-      if (closing.get() || closed.get()) {
-        close();
-        throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
-          " is closing=" + closing.get() + " or closed=" + closed.get());
-      }
+      startRegionOperation();
+      try {
 
-      // This could be a new thread from the last time we called next().
-      ReadWriteConsistencyControl.setThreadReadPoint(this.readPt);
+        // This could be a new thread from the last time we called next().
+        ReadWriteConsistencyControl.setThreadReadPoint(this.readPt);
 
-      results.clear();
-      boolean returnResult = nextInternal(limit);
+        results.clear();
+        boolean returnResult = nextInternal(limit);
 
-      outResults.addAll(results);
-      resetFilters();
-      if (isFilterDone()) {
-        return false;
+        outResults.addAll(results);
+        resetFilters();
+        if (isFilterDone()) {
+          return false;
+        }
+        return returnResult;
+      } finally {
+        closeRegionOperation();
       }
-      return returnResult;
     }
 
     public synchronized boolean next(List<KeyValue> outResults)
@@ -2669,17 +2540,6 @@ public class HRegion implements HeapSize
   /**
    * Computes the Path of the HRegion
    *
-   * @param tabledir qualified path for table
-   * @param name ENCODED region name
-   * @return Path of HRegion directory
-   */
-  public static Path getRegionDir(final Path tabledir, final String name) {
-    return new Path(tabledir, name);
-  }
-
-  /**
-   * Computes the Path of the HRegion
-   *
    * @param rootdir qualified path of HBase root directory
    * @param info HRegionInfo for the region
    * @return qualified path of region directory
@@ -2999,50 +2859,52 @@ public class HRegion implements HeapSize
     checkRow(row);
     boolean flush = false;
     // Lock row
-    Integer lid = obtainRowLock(row);
     long result = amount;
+    startRegionOperation();
     try {
-      Store store = stores.get(family);
-
-      // TODO call the proper GET API
-      // Get the old value:
-      Get get = new Get(row);
-      get.addColumn(family, qualifier);
-      List<KeyValue> results = new ArrayList<KeyValue>();
-      NavigableSet<byte[]> qualifiers = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
-      qualifiers.add(qualifier);
-      store.get(get, qualifiers, results);
-
-      if (!results.isEmpty()) {
-        KeyValue kv = results.get(0);
-        byte [] buffer = kv.getBuffer();
-        int valueOffset = kv.getValueOffset();
-        result += Bytes.toLong(buffer, valueOffset, Bytes.SIZEOF_LONG);
-      }
-
-      // bulid the KeyValue now:
-      KeyValue newKv = new KeyValue(row, family,
-          qualifier, EnvironmentEdgeManager.currentTimeMillis(),
-          Bytes.toBytes(result));
+      Integer lid = obtainRowLock(row);
+      try {
+        Store store = stores.get(family);
 
-      // now log it:
-      if (writeToWAL) {
-        long now = EnvironmentEdgeManager.currentTimeMillis();
-        WALEdit walEdit = new WALEdit();
-        walEdit.add(newKv);
-        this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
-          walEdit, now);
-      }
+        // Get the old value:
+        Get get = new Get(row);
+        get.addColumn(family, qualifier);
+
+        List<KeyValue> results = get(get);
+
+        if (!results.isEmpty()) {
+          KeyValue kv = results.get(0);
+          byte [] buffer = kv.getBuffer();
+          int valueOffset = kv.getValueOffset();
+          result += Bytes.toLong(buffer, valueOffset, Bytes.SIZEOF_LONG);
+        }
+
+        // bulid the KeyValue now:
+        KeyValue newKv = new KeyValue(row, family,
+            qualifier, EnvironmentEdgeManager.currentTimeMillis(),
+            Bytes.toBytes(result));
+
+        // now log it:
+        if (writeToWAL) {
+          long now = EnvironmentEdgeManager.currentTimeMillis();
+          WALEdit walEdit = new WALEdit();
+          walEdit.add(newKv);
+          this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
+            walEdit, now);
+        }
 
-      // Now request the ICV to the store, this will set the timestamp
-      // appropriately depending on if there is a value in memcache or not.
-      // returns the
-      long size = store.updateColumnValue(row, family, qualifier, result);
+        // Now request the ICV to the store, this will set the timestamp
+        // appropriately depending on if there is a value in memcache or not.
+        // returns the
+        long size = store.updateColumnValue(row, family, qualifier, result);
 
-      size = this.memstoreSize.addAndGet(size);
-      flush = isFlushSize(size);
+        size = this.memstoreSize.addAndGet(size);
+        flush = isFlushSize(size);
+      } finally {
+        releaseRowLock(lid);
+      }
     } finally {
-      releaseRowLock(lid);
+      closeRegionOperation();
     }
 
     if (flush) {
@@ -3069,7 +2931,7 @@ public class HRegion implements HeapSize
 
   public static final long FIXED_OVERHEAD = ClassSize.align(
       (4 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN +
-      (20 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT);
+      (18 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT);
 
   public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
       ClassSize.OBJECT + (2 * ClassSize.ATOMIC_BOOLEAN) +
@@ -3187,6 +3049,34 @@ public class HRegion implements HeapSize
   }
 
   /**
+   * This method needs to be called before any public call that reads or
+   * modifies data. It has to be called just before a try.
+   * #closeRegionOperation needs to be called in the try's finally block
+   * Acquires a read lock and checks if the region is closing or closed.
+   * @throws NotServingRegionException when the region is closing or closed
+   */
+  private void startRegionOperation() throws NotServingRegionException {
+    if (this.closing.get()) {
+      throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
+          " is closing");
+    }
+    lock.readLock().lock();
+    if (this.closed.get()) {
+      lock.readLock().unlock();
+      throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
+          " is closed");
+    }
+  }
+
+  /**
+   * Closes the lock. This needs to be called in the finally block corresponding
+   * to the try block of #startRegionOperation
+   */
+  private void closeRegionOperation(){
+    lock.readLock().unlock();
+  }
+
+  /**
    * A mocked list implementaion - discards all updates.
    */
   private static final List<KeyValue> MOCKED_LIST = new AbstractList<KeyValue>() {

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Fri Aug 27 05:01:02 2010
@@ -49,6 +49,10 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -104,6 +108,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.handler.OpenRootHandler;
 import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.replication.regionserver.Replication;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.InfoServer;
@@ -250,6 +255,9 @@ public class HRegionServer implements HR
   // Instance of the hbase executor service.
   private ExecutorService service;
 
+  // Replication services
+  private Replication replicationHandler;
+
   /**
    * Starts a HRegionServer at the default location
    *
@@ -676,6 +684,14 @@ public class HRegionServer implements HR
             + this.serverInfo.getServerAddress() + ", Now=" + hsa.toString());
         this.serverInfo.setServerAddress(hsa);
       }
+      
+      // hack! Maps DFSClient => RegionServer for logs.  HDFS made this 
+      // config param for task trackers, but we can piggyback off of it.
+      if (this.conf.get("mapred.task.id") == null) {
+        this.conf.set("mapred.task.id", 
+            "hb_rs_" + this.serverInfo.getServerName());
+      }
+      
       // Master sent us hbase.rootdir to use. Should be fully qualified
       // path with file system specification included. Set 'fs.defaultFS'
       // to match the filesystem on hbase.rootdir else underlying hadoop hdfs
@@ -756,6 +772,11 @@ public class HRegionServer implements HR
    * @return Throwable converted to an IOE; methods can only let out IOEs.
    */
   private Throwable cleanup(final Throwable t, final String msg) {
+    // Don't log as error if NSRE; NSRE is 'normal' operation.
+    if (t instanceof NotServingRegionException) {
+      LOG.debug("NotServingRegionException; " +  t.getMessage());
+      return t;
+    }
     if (msg == null) {
       LOG.error("", RemoteExceptionHandler.checkThrowable(t));
     } else {
@@ -877,16 +898,19 @@ public class HRegionServer implements HR
           + "running at " + this.serverInfo.getServerName()
           + " because logdir " + logdir.toString() + " exists");
     }
-    HLog newlog = instantiateHLog(logdir, oldLogDir);
-    return newlog;
+    this.replicationHandler = new Replication(this, this.fs, logdir, oldLogDir);
+    HLog log = instantiateHLog(logdir, oldLogDir);
+    this.replicationHandler.addLogEntryVisitor(log);
+    return log;
   }
 
   // instantiate
-  protected HLog instantiateHLog(Path logdir, Path oldLogDir)
-      throws IOException {
-    HLog newlog = new HLog(fs, logdir, oldLogDir, conf, hlogRoller, null,
-        serverInfo.getServerAddress().toString());
-    return newlog;
+  protected HLog instantiateHLog(Path logdir, Path oldLogDir) throws IOException {
+    return new HLog(this.fs, logdir, oldLogDir, this.conf, this.hlogRoller,
+      this.replicationHandler != null?
+        this.replicationHandler.getReplicationManager():
+        null,
+        this.serverInfo.getServerAddress().toString());
   }
 
   protected LogRoller getLogRoller() {
@@ -1026,13 +1050,15 @@ public class HRegionServer implements HR
           port++;
           // update HRS server info port.
           this.serverInfo = new HServerInfo(this.serverInfo.getServerAddress(),
-              this.serverInfo.getStartCode(), port, this.serverInfo
-                  .getHostname());
+            this.serverInfo.getStartCode(), port,
+            this.serverInfo.getHostname());
         }
       }
     }
 
-    // Start Server. This service is like leases in that it internally runs
+    this.replicationHandler.startReplicationServices();
+
+    // Start Server.  This service is like leases in that it internally runs
     // a thread.
     this.server.start();
     LOG.info("HRegionServer started at: "
@@ -1119,7 +1145,7 @@ public class HRegionServer implements HR
     this.abortRequested = true;
     this.reservedSpace.clear();
     if (this.metrics != null) {
-      LOG.info("Dump of metrics: " + this.metrics.toString());
+      LOG.info("Dump of metrics: " + this.metrics);
     }
     stop(reason);
   }
@@ -1151,6 +1177,7 @@ public class HRegionServer implements HR
     Threads.shutdown(this.compactSplitThread);
     Threads.shutdown(this.hlogRoller);
     this.service.shutdown();
+    this.replicationHandler.join();
   }
 
   /**
@@ -1254,10 +1281,6 @@ public class HRegionServer implements HR
             + newRegionB.getRegionNameAsString())));
   }
 
-  // ////////////////////////////////////////////////////////////////////////////
-  // HMaster-given operations
-  // ////////////////////////////////////////////////////////////////////////////
-
   /**
    * Closes all regions.  Called on our way out.
    * Assumes that its not possible for new regions to be added to onlineRegions
@@ -2221,7 +2244,7 @@ public class HRegionServer implements HR
     if (message != null) {
       System.err.println(message);
     }
-    System.err.println("Usage: java org.apache.hbase.HRegionServer start|stop");
+    System.err.println("Usage: java org.apache.hbase.HRegionServer start|stop [-D <conf.param=value>]");
     System.exit(0);
   }
 
@@ -2245,6 +2268,11 @@ public class HRegionServer implements HR
     }
   }
 
+  @Override
+  public void replicateLogEntries(HLog.Entry[] entries) throws IOException {
+    this.replicationHandler.replicateLogEntries(entries);
+  }
+
   /**
    * Do class main.
    *
@@ -2254,15 +2282,26 @@ public class HRegionServer implements HR
    */
   protected static void doMain(final String[] args,
       final Class<? extends HRegionServer> regionServerClass) {
-    if (args.length < 1) {
-      printUsageAndExit();
-    }
     Configuration conf = HBaseConfiguration.create();
 
-    // Process command-line args. TODO: Better cmd-line processing
-    // (but hopefully something not as painful as cli options).
-    for (String cmd : args) {
-      if (cmd.equals("start")) {
+    Options opt = new Options();
+    opt.addOption("D", true, "Override HBase Configuration Settings");
+    try {
+      CommandLine cmd = new GnuParser().parse(opt, args);
+
+      if (cmd.hasOption("D")) {
+        for (String confOpt : cmd.getOptionValues("D")) {
+          String[] kv = confOpt.split("=", 2);
+          if (kv.length == 2) {
+            conf.set(kv[0], kv[1]);
+            LOG.debug("-D configuration override: " + kv[0] + "=" + kv[1]);
+          } else {
+            throw new ParseException("-D option format invalid: " + confOpt);
+          }
+        }
+      }
+
+      if (cmd.getArgList().contains("start")) {
         try {
           // If 'local', don't start a region server here. Defer to
           // LocalHBaseCluster. It manages 'local' clusters.
@@ -2278,19 +2317,20 @@ public class HRegionServer implements HR
             startRegionServer(hrs);
           }
         } catch (Throwable t) {
-          LOG.error("Can not start region server because "
-              + StringUtils.stringifyException(t));
+          LOG.error( "Can not start region server because "+
+              StringUtils.stringifyException(t) );
+          System.exit(-1);
         }
-        break;
-      }
-
-      if (cmd.equals("stop")) {
-        printUsageAndExit("To shutdown the regionserver run "
-            + "bin/hbase-daemon.sh stop regionserver or send a kill signal to"
-            + "the regionserver pid");
+      } else if (cmd.getArgList().contains("stop")) {
+        throw new ParseException("To shutdown the regionserver run " +
+            "bin/hbase-daemon.sh stop regionserver or send a kill signal to" +
+            "the regionserver pid");
+      } else {
+        throw new ParseException("Unknown argument(s): " +
+            org.apache.commons.lang.StringUtils.join(cmd.getArgs(), " "));
       }
-
-      // Print out usage if we get to here.
+    } catch (ParseException e) {
+      LOG.error("Could not parse", e);
       printUsageAndExit();
     }
   }
@@ -2305,4 +2345,8 @@ public class HRegionServer implements HR
         .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
     doMain(args, regionServerClass);
   }
+
+  public int getNumberOfOnlineRegions() {
+    return onlineRegions.size();
+  }
 }
\ No newline at end of file

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java Fri Aug 27 05:01:02 2010
@@ -41,7 +41,7 @@ import java.util.PriorityQueue;
  * as an InternalScanner at the Store level, you will get runtime exceptions.
  */
 public class KeyValueHeap implements KeyValueScanner, InternalScanner {
-  private PriorityQueue<KeyValueScanner> heap;
+  private PriorityQueue<KeyValueScanner> heap = null;
   private KeyValueScanner current = null;
   private KVScannerComparator comparator;
 
@@ -51,22 +51,25 @@ public class KeyValueHeap implements Key
    * @param scanners
    * @param comparator
    */
-  public KeyValueHeap(List<? extends KeyValueScanner> scanners, KVComparator comparator) {
+  public KeyValueHeap(List<? extends KeyValueScanner> scanners,
+      KVComparator comparator) {
     this.comparator = new KVScannerComparator(comparator);
-    this.heap = new PriorityQueue<KeyValueScanner>(scanners.size(),
-        this.comparator);
-    for (KeyValueScanner scanner : scanners) {
-      if (scanner.peek() != null) {
-        this.heap.add(scanner);
-      } else {
-        scanner.close();
+    if (!scanners.isEmpty()) {
+      this.heap = new PriorityQueue<KeyValueScanner>(scanners.size(),
+          this.comparator);
+      for (KeyValueScanner scanner : scanners) {
+        if (scanner.peek() != null) {
+          this.heap.add(scanner);
+        } else {
+          scanner.close();
+        }
       }
+      this.current = heap.poll();
     }
-    this.current = heap.poll();
   }
 
   public KeyValue peek() {
-    if(this.current == null) {
+    if (this.current == null) {
       return null;
     }
     return this.current.peek();
@@ -78,12 +81,12 @@ public class KeyValueHeap implements Key
     }
     KeyValue kvReturn = this.current.next();
     KeyValue kvNext = this.current.peek();
-    if(kvNext == null) {
+    if (kvNext == null) {
       this.current.close();
       this.current = this.heap.poll();
     } else {
       KeyValueScanner topScanner = this.heap.peek();
-      if(topScanner == null ||
+      if (topScanner == null ||
           this.comparator.compare(kvNext, topScanner.peek()) > 0) {
         this.heap.add(this.current);
         this.current = this.heap.poll();
@@ -104,10 +107,20 @@ public class KeyValueHeap implements Key
    * @return true if there are more keys, false if all scanners are done
    */
   public boolean next(List<KeyValue> result, int limit) throws IOException {
+    if (this.current == null) {
+      return false;
+    }
     InternalScanner currentAsInternal = (InternalScanner)this.current;
-    currentAsInternal.next(result, limit);
+    boolean mayContainsMoreRows = currentAsInternal.next(result, limit);
     KeyValue pee = this.current.peek();
-    if (pee == null) {
+    /*
+     * By definition, any InternalScanner must return false only when it has no
+     * further rows to be fetched. So, we can close a scanner if it returns
+     * false. All existing implementations seem to be fine with this. It is much
+     * more efficient to close scanners which are not needed than keep them in
+     * the heap. This is also required for certain optimizations.
+     */
+    if (pee == null || !mayContainsMoreRows) {
       this.current.close();
     } else {
       this.heap.add(this.current);
@@ -160,12 +173,14 @@ public class KeyValueHeap implements Key
   }
 
   public void close() {
-    if(this.current != null) {
+    if (this.current != null) {
       this.current.close();
     }
-    KeyValueScanner scanner;
-    while((scanner = this.heap.poll()) != null) {
-      scanner.close();
+    if (this.heap != null) {
+      KeyValueScanner scanner;
+      while ((scanner = this.heap.poll()) != null) {
+        scanner.close();
+      }
     }
   }
 
@@ -178,10 +193,10 @@ public class KeyValueHeap implements Key
    * automatically closed and removed from the heap.
    * @param seekKey KeyValue to seek at or after
    * @return true if KeyValues exist at or after specified key, false if not
-   * @throws IOException 
+   * @throws IOException
    */
   public boolean seek(KeyValue seekKey) throws IOException {
-    if(this.current == null) {
+    if (this.current == null) {
       return false;
     }
     this.heap.add(this.current);
@@ -205,6 +220,33 @@ public class KeyValueHeap implements Key
     return false;
   }
 
+  public boolean reseek(KeyValue seekKey) throws IOException {
+    //This function is very identical to the seek(KeyValue) function except that
+    //scanner.seek(seekKey) is changed to scanner.reseek(seekKey)
+    if (this.current == null) {
+      return false;
+    }
+    this.heap.add(this.current);
+    this.current = null;
+
+    KeyValueScanner scanner;
+    while ((scanner = this.heap.poll()) != null) {
+      KeyValue topKey = scanner.peek();
+      if (comparator.getComparator().compare(seekKey, topKey) <= 0) {
+        // Top KeyValue is at-or-after Seek KeyValue
+        this.current = scanner;
+        return true;
+      }
+      if (!scanner.reseek(seekKey)) {
+        scanner.close();
+      } else {
+        this.heap.add(scanner);
+      }
+    }
+    // Heap is returning empty, scanner is done
+    return false;
+  }
+
   /**
    * @return the current Heap
    */

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java Fri Aug 27 05:01:02 2010
@@ -47,6 +47,16 @@ public interface KeyValueScanner {
   public boolean seek(KeyValue key) throws IOException;
 
   /**
+   * Reseek the scanner at or after the specified KeyValue.
+   * This method is guaranteed to seek to or before the required key only if the
+   * key comes after the current position of the scanner. Should not be used
+   * to seek to a key which may come before the current position.
+   * @param key seek value (should be non-null)
+   * @return true if scanner has values left, false if end of scanner
+   */
+  public boolean reseek(KeyValue key) throws IOException;
+
+  /**
    * Close the KeyValue scanner.
    */
   public void close();

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Fri Aug 27 05:01:02 2010
@@ -20,18 +20,14 @@
 
 package org.apache.hadoop.hbase.regionserver;
 
-import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.lang.management.RuntimeMXBean;
 import java.rmi.UnexpectedException;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NavigableSet;
-import java.util.Set;
 import java.util.SortedSet;
-import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -39,8 +35,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.HeapSize;
-import org.apache.hadoop.hbase.regionserver.DeleteCompare.DeleteCode;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
 
@@ -81,6 +77,9 @@ public class MemStore implements HeapSiz
   // Used to track own heapSize
   final AtomicLong size;
 
+  TimeRangeTracker timeRangeTracker;
+  TimeRangeTracker snapshotTimeRangeTracker;
+
   /**
    * Default constructor. Used for tests.
    */
@@ -99,6 +98,8 @@ public class MemStore implements HeapSiz
     this.comparatorIgnoreType = this.comparator.getComparatorIgnoringType();
     this.kvset = new KeyValueSkipListSet(c);
     this.snapshot = new KeyValueSkipListSet(c);
+    timeRangeTracker = new TimeRangeTracker();
+    snapshotTimeRangeTracker = new TimeRangeTracker();
     this.size = new AtomicLong(DEEP_OVERHEAD);
   }
 
@@ -128,6 +129,8 @@ public class MemStore implements HeapSiz
         if (!this.kvset.isEmpty()) {
           this.snapshot = this.kvset;
           this.kvset = new KeyValueSkipListSet(this.comparator);
+          this.snapshotTimeRangeTracker = this.timeRangeTracker;
+          this.timeRangeTracker = new TimeRangeTracker();
           // Reset heap to not include any keys
           this.size.set(DEEP_OVERHEAD);
         }
@@ -167,6 +170,7 @@ public class MemStore implements HeapSiz
       // create a new snapshot and let the old one go.
       if (!ss.isEmpty()) {
         this.snapshot = new KeyValueSkipListSet(this.comparator);
+        this.snapshotTimeRangeTracker = new TimeRangeTracker();
       }
     } finally {
       this.lock.writeLock().unlock();
@@ -183,6 +187,7 @@ public class MemStore implements HeapSiz
     this.lock.readLock().lock();
     try {
       s = heapSizeChange(kv, this.kvset.add(kv));
+      timeRangeTracker.includeTimestamp(kv);
       this.size.addAndGet(s);
     } finally {
       this.lock.readLock().unlock();
@@ -198,9 +203,9 @@ public class MemStore implements HeapSiz
   long delete(final KeyValue delete) {
     long s = 0;
     this.lock.readLock().lock();
-
     try {
       s += heapSizeChange(delete, this.kvset.add(delete));
+      timeRangeTracker.includeTimestamp(delete);
     } finally {
       this.lock.readLock().unlock();
     }
@@ -341,6 +346,112 @@ public class MemStore implements HeapSiz
     }
   }
 
+  /**
+   * Given the specs of a column, update it, first by inserting a new record,
+   * then removing the old one.  Since there is only 1 KeyValue involved, the memstoreTS
+   * will be set to 0, thus ensuring that they instantly appear to anyone. The underlying
+   * store will ensure that the insert/delete each are atomic. A scanner/reader will either
+   * get the new value, or the old value and all readers will eventually only see the new
+   * value after the old was removed.
+   *
+   * @param row
+   * @param family
+   * @param qualifier
+   * @param newValue
+   * @param now
+   * @return
+   */
+  public long updateColumnValue(byte[] row,
+                                byte[] family,
+                                byte[] qualifier,
+                                long newValue,
+                                long now) {
+   this.lock.readLock().lock();
+    try {
+      KeyValue firstKv = KeyValue.createFirstOnRow(
+          row, family, qualifier);
+      // create a new KeyValue with 'now' and a 0 memstoreTS == immediately visible
+      KeyValue newKv;
+      // Is there a KeyValue in 'snapshot' with the same TS? If so, upgrade the timestamp a bit.
+      SortedSet<KeyValue> snSs = snapshot.tailSet(firstKv);
+      if (!snSs.isEmpty()) {
+        KeyValue snKv = snSs.first();
+        // is there a matching KV in the snapshot?
+        if (snKv.matchingRow(firstKv) && snKv.matchingQualifier(firstKv)) {
+          if (snKv.getTimestamp() == now) {
+            // poop,
+            now += 1;
+          }
+        }
+      }
+
+      // logic here: the new ts MUST be at least 'now'. But it could be larger if necessary.
+      // But the timestamp should also be max(now, mostRecentTsInMemstore)
+
+      // so we cant add the new KV w/o knowing what's there already, but we also
+      // want to take this chance to delete some kvs. So two loops (sad)
+
+      SortedSet<KeyValue> ss = kvset.tailSet(firstKv);
+      Iterator<KeyValue> it = ss.iterator();
+      while ( it.hasNext() ) {
+        KeyValue kv = it.next();
+
+        // if this isnt the row we are interested in, then bail:
+        if (!firstKv.matchingRow(kv)) {
+          break; // rows dont match, bail.
+        }
+
+        // if the qualifier matches and it's a put, just RM it out of the kvset.
+        if (firstKv.matchingQualifier(kv)) {
+          // to be extra safe we only remove Puts that have a memstoreTS==0
+          if (kv.getType() == KeyValue.Type.Put.getCode()) {
+            now = Math.max(now, kv.getTimestamp());
+          }
+        }
+      }
+
+
+      // add the new value now. this might have the same TS as an existing KV, thus confusing
+      // readers slightly for a MOMENT until we erase the old one (and thus old value).
+      newKv = new KeyValue(row, family, qualifier,
+          now,
+          Bytes.toBytes(newValue));
+      long addedSize = add(newKv);
+
+      // remove extra versions.
+      ss = kvset.tailSet(firstKv);
+      it = ss.iterator();
+      while ( it.hasNext() ) {
+        KeyValue kv = it.next();
+
+        if (kv == newKv) {
+          // ignore the one i just put in (heh)
+          continue;
+        }
+
+        // if this isnt the row we are interested in, then bail:
+        if (!firstKv.matchingRow(kv)) {
+          break; // rows dont match, bail.
+        }
+
+        // if the qualifier matches and it's a put, just RM it out of the kvset.
+        if (firstKv.matchingQualifier(kv)) {
+          // to be extra safe we only remove Puts that have a memstoreTS==0
+          if (kv.getType() == KeyValue.Type.Put.getCode()) {
+            // false means there was a change, so give us the size.
+            addedSize -= heapSizeChange(kv, false);
+
+            it.remove();
+          }
+        }
+      }
+
+      return addedSize;
+    } finally {
+      this.lock.readLock().unlock();
+    }
+  }
+
   /*
    * Immutable data structure to hold member found in set and the set it was
    * found in.  Include set because it is carrying context.
@@ -390,104 +501,20 @@ public class MemStore implements HeapSiz
     }
   }
 
-  //
-  // HBASE-880/1249/1304
-  //
-
-  /**
-   * Perform a single-row Get on the  and snapshot, placing results
-   * into the specified KV list.
-   * <p>
-   * This will return true if it is determined that the query is complete
-   * and it is not necessary to check any storefiles after this.
-   * <p>
-   * Otherwise, it will return false and you should continue on.
-   * @param matcher Column matcher
-   * @param result List to add results to
-   * @return true if done with store (early-out), false if not
-   */
-  public boolean get(QueryMatcher matcher, List<KeyValue> result) {
-    this.lock.readLock().lock();
-    try {
-      if(internalGet(this.kvset, matcher, result) || matcher.isDone()) {
-        return true;
-      }
-      matcher.update();
-      return internalGet(this.snapshot, matcher, result) || matcher.isDone();
-    } finally {
-      this.lock.readLock().unlock();
-    }
-  }
-
   /**
-   * Gets from either the memstore or the snapshop, and returns a code
-   * to let you know which is which.
-   *
-   * @param matcher query matcher
-   * @param result puts results here
-   * @return 1 == memstore, 2 == snapshot, 0 == none
+   * Check if this memstore may contain the required keys
+   * @param scan
+   * @return False if the key definitely does not exist in this Memstore
    */
-  int getWithCode(QueryMatcher matcher, List<KeyValue> result) {
-    this.lock.readLock().lock();
-    try {
-      boolean fromMemstore = internalGet(this.kvset, matcher, result);
-      if (fromMemstore || matcher.isDone())
-        return 1;
-
-      matcher.update();
-      boolean fromSnapshot = internalGet(this.snapshot, matcher, result);
-      if (fromSnapshot || matcher.isDone())
-        return 2;
-
-      return 0;
-    } finally {
-      this.lock.readLock().unlock();
-    }
+  public boolean shouldSeek(Scan scan) {
+    return timeRangeTracker.includesTimeRange(scan.getTimeRange()) ||
+        snapshotTimeRangeTracker.includesTimeRange(scan.getTimeRange());
   }
 
-  /**
-   * Small utility functions for use by Store.incrementColumnValue
-   * _only_ under the threat of pain and everlasting race conditions.
-   */
-  void readLockLock() {
-    this.lock.readLock().lock();
-  }
-  void readLockUnlock() {
-    this.lock.readLock().unlock();
+  public TimeRangeTracker getSnapshotTimeRangeTracker() {
+    return this.snapshotTimeRangeTracker;
   }
 
-  /**
-   *
-   * @param set memstore or snapshot
-   * @param matcher query matcher
-   * @param result list to add results to
-   * @return true if done with store (early-out), false if not
-   */
-  boolean internalGet(final NavigableSet<KeyValue> set,
-      final QueryMatcher matcher, final List<KeyValue> result) {
-    if(set.isEmpty()) return false;
-    // Seek to startKey
-    SortedSet<KeyValue> tail = set.tailSet(matcher.getStartKey());
-    for (KeyValue kv : tail) {
-      QueryMatcher.MatchCode res = matcher.match(kv);
-      switch(res) {
-        case INCLUDE:
-          result.add(kv);
-          break;
-        case SKIP:
-          break;
-        case NEXT:
-          return false;
-        case DONE:
-          return true;
-        default:
-          throw new RuntimeException("Unexpected " + res);
-      }
-    }
-    return false;
-  }
-
-
   /*
    * MemStoreScanner implements the KeyValueScanner.
    * It lets the caller scan the contents of a memstore -- both current
@@ -520,7 +547,7 @@ public class MemStore implements HeapSiz
       StoreScanner level with coordination with MemStoreScanner.
 
     */
-    
+
     MemStoreScanner() {
       super();
 
@@ -531,7 +558,7 @@ public class MemStore implements HeapSiz
       KeyValue ret = null;
       long readPoint = ReadWriteConsistencyControl.getThreadReadPoint();
       //DebugPrint.println( " MS@" + hashCode() + ": threadpoint = " + readPoint);
-      
+
       while (ret == null && it.hasNext()) {
         KeyValue v = it.next();
         if (v.getMemstoreTS() <= readPoint) {
@@ -566,13 +593,27 @@ public class MemStore implements HeapSiz
       //DebugPrint.println( " MS@" + hashCode() + " snapshot seek: " + snapshotNextRow + " with size = " +
       //    snapshot.size() + " threadread = " + readPoint);
 
-      
+
       KeyValue lowest = getLowest();
 
       // has data := (lowest != null)
       return lowest != null;
     }
 
+    @Override
+    public boolean reseek(KeyValue key) {
+      while (kvsetNextRow != null &&
+          comparator.compare(kvsetNextRow, key) < 0) {
+        kvsetNextRow = getNext(kvsetIt);
+      }
+
+      while (snapshotNextRow != null &&
+          comparator.compare(snapshotNextRow, key) < 0) {
+        snapshotNextRow = getNext(snapshotIt);
+      }
+      return (kvsetNextRow != null || snapshotNextRow != null);
+    }
+
     public synchronized KeyValue peek() {
       //DebugPrint.println(" MS@" + hashCode() + " peek = " + getLowest());
       return getLowest();
@@ -630,8 +671,8 @@ public class MemStore implements HeapSiz
   }
 
   public final static long FIXED_OVERHEAD = ClassSize.align(
-      ClassSize.OBJECT + (7 * ClassSize.REFERENCE));
-  
+      ClassSize.OBJECT + (9 * ClassSize.REFERENCE));
+
   public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
       ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG +
       ClassSize.COPYONWRITE_ARRAYSET + ClassSize.COPYONWRITE_ARRAYLIST +

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java Fri Aug 27 05:01:02 2010
@@ -22,7 +22,6 @@ package org.apache.hadoop.hbase.regionse
 
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.io.hfile.HFile;
 
 import java.io.IOException;
 import java.util.List;
@@ -73,6 +72,10 @@ public class MinorCompactingStoreScanner
     throw new UnsupportedOperationException("Can't seek a MinorCompactingStoreScanner");
   }
 
+  public boolean reseek(KeyValue key) {
+    return seek(key);
+  }
+
   /**
    * High performance merge scan.
    * @param writer

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java Fri Aug 27 05:01:02 2010
@@ -41,18 +41,39 @@ public class ReadWriteConsistencyControl
   private static final ThreadLocal<Long> perThreadReadPoint =
       new ThreadLocal<Long>();
 
+  /**
+   * Get this thread's read point. Used primarily by the memstore scanner to
+   * know which values to skip (ie: have not been completed/committed to 
+   * memstore).
+   */
   public static long getThreadReadPoint() {
     return perThreadReadPoint.get();
   }
 
+  /** 
+   * Set the thread read point to the given value. The thread RWCC
+   * is used by the Memstore scanner so it knows which values to skip. 
+   * Give it a value of 0 if you want everything.
+   */
   public static void setThreadReadPoint(long readPoint) {
     perThreadReadPoint.set(readPoint);
   }
 
+  /**
+   * Set the thread RWCC read point to whatever the current read point is in
+   * this particular instance of RWCC.  Returns the new thread read point value.
+   */
   public static long resetThreadReadPoint(ReadWriteConsistencyControl rwcc) {
     perThreadReadPoint.set(rwcc.memstoreReadPoint());
     return getThreadReadPoint();
   }
+  
+  /**
+   * Set the thread RWCC read point to 0 (include everything).
+   */
+  public static void resetThreadReadPoint() {
+    perThreadReadPoint.set(0L);
+  }
 
   public WriteEntry beginMemstoreInsert() {
     synchronized (writeQueue) {