You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2009/04/12 12:39:58 UTC

svn commit: r764289 [4/8] - in /hadoop/hbase/trunk: ./ src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/client/ src/java/org/apache/hadoop/hbase/filter/ src/java/org/apache/hadoop/hbase/io/ src/java/org/apache/hadoop/hbase/io/hfile/ s...

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java?rev=764289&r1=764288&r2=764289&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java Sun Apr 12 10:39:55 2009
@@ -25,122 +25,135 @@
 import java.lang.management.RuntimeMXBean;
 import java.rmi.UnexpectedException;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableSet;
 import java.util.Set;
 import java.util.SortedMap;
+import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HStoreKey;
-import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.HRegion.Counter;
 import org.apache.hadoop.hbase.util.Bytes;
 
-
 /**
- * The Memcache holds in-memory modifications to the HRegion.
- * Keeps a current map.  When asked to flush the map, current map is moved
- * to snapshot and is cleared.  We continue to serve edits out of new map
+ * The Memcache holds in-memory modifications to the HRegion.  Modifications
+ * are {@link KeyValue}s.  When asked to flush, current memcache is moved
+ * to snapshot and is cleared.  We continue to serve edits out of new memcache
  * and backing snapshot until flusher reports in that the flush succeeded. At
  * this point we let the snapshot go.
+ * TODO: Adjust size of the memcache when we remove items because they have
+ * been deleted.
  */
 class Memcache {
   private static final Log LOG = LogFactory.getLog(Memcache.class);
-  
+
   private final long ttl;
 
   // Note that since these structures are always accessed with a lock held,
-  // so no additional synchronization is required.
+  // no additional synchronization is required.
   
-  // The currently active sorted map of edits.
-  private volatile SortedMap<HStoreKey, byte[]> memcache;
+  // The currently active sorted set of edits.  Using explicit type because
+  // if I use NavigableSet, I lose some facility -- I can't get a NavigableSet
+  // when I do tailSet or headSet.
+  volatile ConcurrentSkipListSet<KeyValue> memcache;
 
   // Snapshot of memcache.  Made for flusher.
-  private volatile SortedMap<HStoreKey, byte[]> snapshot;
+  volatile ConcurrentSkipListSet<KeyValue> snapshot;
 
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
-  private final Comparator<HStoreKey> comparator;
-  private final HStoreKey.StoreKeyComparator rawcomparator;
+  final KeyValue.KVComparator comparator;
+
+  // Used comparing versions -- same r/c and ts but different type.
+  final KeyValue.KVComparator comparatorIgnoreType;
+
+  // Used comparing versions -- same r/c and type but different timestamp.
+  final KeyValue.KVComparator comparatorIgnoreTimestamp;
+
+  // TODO: Fix this guess by studying jprofiler
+  private final static int ESTIMATED_KV_HEAP_TAX = 60;
 
   /**
    * Default constructor. Used for tests.
    */
-  @SuppressWarnings("unchecked")
   public Memcache() {
-    this(HConstants.FOREVER, new HStoreKey.HStoreKeyComparator(),
-      new HStoreKey.StoreKeyComparator());
+    this(HConstants.FOREVER, KeyValue.COMPARATOR);
   }
 
   /**
    * Constructor.
    * @param ttl The TTL for cache entries, in milliseconds.
-   * @param c 
-   * @param rc
+   * @param c
    */
-  public Memcache(final long ttl, final Comparator<HStoreKey> c,
-      final HStoreKey.StoreKeyComparator rc) {
+  public Memcache(final long ttl, final KeyValue.KVComparator c) {
     this.ttl = ttl;
     this.comparator = c;
-    this.rawcomparator = rc;
-    this.memcache = createSynchronizedSortedMap(c);
-    this.snapshot = createSynchronizedSortedMap(c);
+    this.comparatorIgnoreTimestamp =
+      this.comparator.getComparatorIgnoringTimestamps();
+    this.comparatorIgnoreType = this.comparator.getComparatorIgnoringType();
+    this.memcache = createSet(c);
+    this.snapshot = createSet(c);
   }
 
-  /*
-   * Utility method using HSKWritableComparator
-   * @return synchronized sorted map of HStoreKey to byte arrays.
-   */
-  private SortedMap<HStoreKey, byte[]> createSynchronizedSortedMap(final Comparator<HStoreKey> c) {
-    return Collections.synchronizedSortedMap(new TreeMap<HStoreKey, byte []>(c));
+  static ConcurrentSkipListSet<KeyValue> createSet(final KeyValue.KVComparator c) {
+    return new ConcurrentSkipListSet<KeyValue>(c);
+  }
+
+  void dump() {
+    for (KeyValue kv: this.memcache) {
+      LOG.info(kv);
+    }
+    for (KeyValue kv: this.snapshot) {
+      LOG.info(kv);
+    }
   }
 
   /**
    * Creates a snapshot of the current Memcache.
    * Snapshot must be cleared by call to {@link #clearSnapshot(SortedMap)}
-   * To get the snapshot made by this method, use
-   * {@link #getSnapshot}.
+   * To get the snapshot made by this method, use {@link #getSnapshot}.
    */
   void snapshot() {
     this.lock.writeLock().lock();
     try {
       // If snapshot currently has entries, then flusher failed or didn't call
       // cleanup.  Log a warning.
-      if (this.snapshot.size() > 0) {
-        LOG.debug("Snapshot called again without clearing previous. " +
+      if (!this.snapshot.isEmpty()) {
+        LOG.warn("Snapshot called again without clearing previous. " +
           "Doing nothing. Another ongoing flush or did we fail last attempt?");
       } else {
         // We used to synchronize on the memcache here but we're inside a
         // write lock so removed it. Comment is left in case removal was a
         // mistake. St.Ack
-        if (this.memcache.size() != 0) {
+        if (!this.memcache.isEmpty()) {
           this.snapshot = this.memcache;
-          this.memcache = createSynchronizedSortedMap(this.comparator);
+          this.memcache = createSet(this.comparator);
         }
       }
     } finally {
       this.lock.writeLock().unlock();
     }
   }
-  
+
   /**
    * Return the current snapshot.
    * Called by flusher to get current snapshot made by a previous
    * call to {@link snapshot}.
    * @return Return snapshot.
    * @see {@link #snapshot()}
-   * @see {@link #clearSnapshot(SortedMap)}
+   * @see {@link #clearSnapshot(NavigableSet)}
    */
-  SortedMap<HStoreKey, byte[]> getSnapshot() {
+  ConcurrentSkipListSet<KeyValue> getSnapshot() {
     return this.snapshot;
   }
 
@@ -150,7 +163,7 @@
    * @throws UnexpectedException
    * @see {@link #snapshot()}
    */
-  void clearSnapshot(final SortedMap<HStoreKey, byte []> ss)
+  void clearSnapshot(final Set<KeyValue> ss)
   throws UnexpectedException {
     this.lock.writeLock().lock();
     try {
@@ -160,8 +173,8 @@
       }
       // OK. Passed in snapshot is same as current snapshot.  If not-empty,
       // create a new snapshot and let the old one go.
-      if (ss.size() != 0) {
-        this.snapshot = createSynchronizedSortedMap(this.comparator);
+      if (!ss.isEmpty()) {
+        this.snapshot = createSet(this.comparator);
       }
     } finally {
       this.lock.writeLock().unlock();
@@ -170,143 +183,121 @@
 
   /**
    * Write an update
-   * @param key
-   * @param value
-   * @return memcache Approximate size of the passed key and value.  Includes
-   * cost of hosting HSK and byte arrays as well as the Map.Entry this addition
-   * costs when we insert into the backing TreeMap.
+   * @param kv
+   * @return approximate size of the passed key and value.
    */
-  long add(final HStoreKey key, final byte[] value) {
+  long add(final KeyValue kv) {
     long size = -1;
     this.lock.readLock().lock();
     try {
-      byte [] oldValue = this.memcache.remove(key);
-      this.memcache.put(key, value);
-      size = heapSize(key, value, oldValue);
+      boolean notpresent = this.memcache.add(kv);
+      size = heapSize(kv, notpresent);
     } finally {
       this.lock.readLock().unlock();
     }
     return size;
   }
-  
+
   /*
-   * Calcuate how the memcache size has changed, approximately.
-   * Add in tax of TreeMap.Entry.
-   * @param key
-   * @param value
-   * @param oldValue
-   * @return
-   */
-  long heapSize(final HStoreKey key, final byte [] value,
-      final byte [] oldValue) {
-    // First add value length.
-    long keySize = key.heapSize();
-    // Add value.
-    long size = value == null? 0: value.length;
-    if (oldValue == null) {
-      size += keySize;
+   * Calculate how the memcache size has changed, approximately.  Be careful.
+   * If class changes, be sure to change the size calculation.
+   * Add in tax of Map.Entry.
+   * @param kv
+   * @param notpresent True if the kv was NOT present in the set.
+   * @return Size
+   */
+  long heapSize(final KeyValue kv, final boolean notpresent) {
+    return notpresent?
       // Add overhead for value byte array and for Map.Entry -- 57 bytes
       // on x64 according to jprofiler.
-      size += Bytes.ESTIMATED_HEAP_TAX + 57;
-    } else {
-      // If old value, don't add overhead again nor key size. Just add
-      // difference in  value sizes.
-      size -= oldValue.length;
-    }
-    return size;
+      ESTIMATED_KV_HEAP_TAX + 57 + kv.getLength(): 0; // Guess no change in size.
   }
 
   /**
    * Look back through all the backlog TreeMaps to find the target.
-   * @param key
+   * @param kv
    * @param numVersions
-   * @return An array of byte arrays ordered by timestamp.
+   * @return Set of KeyValues. Empty size not null if no results.
    */
-  List<Cell> get(final HStoreKey key, final int numVersions) {
-    return get(key, numVersions, null, System.currentTimeMillis());
+  List<KeyValue> get(final KeyValue kv, final int numVersions) {
+    List<KeyValue> results = new ArrayList<KeyValue>();
+    get(kv, numVersions, results,
+      new TreeSet<KeyValue>(this.comparatorIgnoreType),
+      System.currentTimeMillis());
+    return results;
   }
-  
+
   /**
    * Look back through all the backlog TreeMaps to find the target.
    * @param key
-   * @param numVersions
-   * @param deletes
+   * @param versions
+   * @param results
+   * @param deletes Pass a Set that has a Comparator that ignores key type.
    * @param now
-   * @return An array of byte arrays ordered by timestamp.
+   * @return True if enough versions.
    */
-  List<Cell> get(final HStoreKey key, final int numVersions,
-      final Set<HStoreKey> deletes, final long now) {
+  boolean get(final KeyValue key, final int versions,
+      List<KeyValue> results, final NavigableSet<KeyValue> deletes,
+      final long now) {
     this.lock.readLock().lock();
     try {
-      List<Cell> results;
-      // The synchronizations here are because the below get iterates
-      synchronized (this.memcache) {
-        results = get(this.memcache, key, numVersions, deletes, now);
-      }
-      synchronized (this.snapshot) {
-        results.addAll(results.size(),
-          get(this.snapshot, key, numVersions - results.size(), deletes, now));
+      if (get(this.memcache, key, versions, results, deletes, now)) {
+        return true;
       }
-      return results;
+      return get(this.snapshot, key, versions , results, deletes, now);
     } finally {
       this.lock.readLock().unlock();
     }
   }
-  
+
   /**
+   * @param kv Find the row that comes after this one.  If null, we return the
+   * first.
+   * @return Next row or null if none found.
+   */
+  KeyValue getNextRow(final KeyValue kv) {
+    this.lock.readLock().lock();
+    try {
+      return getLowest(getNextRow(kv, this.memcache),
+        getNextRow(kv, this.snapshot));
+    } finally {
+      this.lock.readLock().unlock();
+    }
+  }
+
+  /*
    * @param a
    * @param b
    * @return Return lowest of a or b or null if both a and b are null
    */
-  private byte [] getLowest(final byte [] a,
-      final byte [] b) {
+  private KeyValue getLowest(final KeyValue a, final KeyValue b) {
     if (a == null) {
       return b;
     }
     if (b == null) {
       return a;
     }
-    return this.rawcomparator.compareRows(a, b) <= 0? a: b;
+    return comparator.compareRows(a, b) <= 0? a: b;
   }
 
-  /**
-   * @param row Find the row that comes after this one.
-   * @return Next row or null if none found
-   */
-  byte [] getNextRow(final byte [] row) {
-    this.lock.readLock().lock();
-    try {
-      return getLowest(getNextRow(row, this.memcache),
-        getNextRow(row, this.snapshot));
-    } finally {
-      this.lock.readLock().unlock();
-    }
-  }
-  
   /*
-   * @param row Find row that follows this one.
-   * @param map Map to look in for a row beyond <code>row</code>.
-   * This method synchronizes on passed map while iterating it.
-   * @return Next row or null if none found.
-   */
-  private byte [] getNextRow(final byte [] row,
-      final SortedMap<HStoreKey, byte []> map) {
-    byte [] result = null;
-    // Synchronize on the map to make the tailMap making 'safe'.
-    synchronized (map) {
-      // Make an HSK with maximum timestamp so we get past most of the current
-      // rows cell entries.
-      HStoreKey hsk = new HStoreKey(row, HConstants.LATEST_TIMESTAMP);
-      SortedMap<HStoreKey, byte []> tailMap = map.tailMap(hsk);
-      // Iterate until we fall into the next row; i.e. move off current row
-      for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) {
-        HStoreKey itKey = es.getKey();
-        if (this.rawcomparator.compareRows(itKey.getRow(), row) <= 0)
-          continue;
-        // Note: Not suppressing deletes or expired cells.
-        result = itKey.getRow();
-        break;
-      }
+   * @param kv Find row that follows this one.  If null, return first.
+   * @param set Set to look in for a row beyond <code>row</code>.
+   * @return Next row or null if none found.  If one found, will be a new
+   * KeyValue -- can be destroyed by subsequent calls to this method.
+   */
+  private KeyValue getNextRow(final KeyValue kv,
+      final NavigableSet<KeyValue> set) {
+    KeyValue result = null;
+    SortedSet<KeyValue> tailset = kv == null? set: set.tailSet(kv);
+    // Iterate until we fall into the next row; i.e. move off current row
+    for (KeyValue i : tailset) {
+      if (comparator.compareRows(i, kv) <= 0)
+        continue;
+      // Note: Not suppressing deletes or expired cells.  Needs to be handled
+      // by higher up functions.
+      result = i;
+      break;
     }
     return result;
   }
@@ -314,225 +305,223 @@
   /**
    * Return all the available columns for the given key.  The key indicates a 
    * row and timestamp, but not a column name.
-   * @param key
+   * @param origin Where to start searching.  Specifies a row and timestamp.
+   * Columns are specified in following arguments.
    * @param columns Pass null for all columns else the wanted subset.
+   * @param columnPattern Column pattern to match.
    * @param numVersions number of versions to retrieve
-   * @param deletes Map to accumulate deletes found.
+   * @param versionsCount Map of KV to Count.  Uses a Comparator that doesn't
+   * look at timestamps so only Row/Column are compared.
+   * @param deletes Pass a Set that has a Comparator that ignores key type.
    * @param results Where to stick row results found.
+   * @return True if we found enough results for passed <code>columns</code>
+   * and <code>numVersions</code>.
    */
-  void getFull(HStoreKey key, Set<byte []> columns, int numVersions,
-    Map<byte [], Long> deletes, Map<byte [], Cell> results) {
+  boolean getFull(final KeyValue key, NavigableSet<byte []> columns,
+      final Pattern columnPattern,
+      int numVersions, final Map<KeyValue, HRegion.Counter> versionsCount,
+      final NavigableSet<KeyValue> deletes,
+      final List<KeyValue> results, final long now) {
     this.lock.readLock().lock();
     try {
-      // The synchronizations here are because internalGet iterates
-      synchronized (this.memcache) {
-        internalGetFull(this.memcache, key, columns, numVersions, deletes, results);
-      }
-      synchronized (this.snapshot) {
-        internalGetFull(this.snapshot, key, columns, numVersions, deletes, results);
+      // Used to be synchronized but now with weak iteration, no longer needed.
+      if (getFull(this.memcache, key, columns, columnPattern, numVersions,
+        versionsCount, deletes, results, now)) {
+        // Has enough results.
+        return true;
       }
+      return getFull(this.snapshot, key, columns, columnPattern, numVersions,
+        versionsCount, deletes, results, now);
     } finally {
       this.lock.readLock().unlock();
     }
   }
 
-  private void internalGetFull(SortedMap<HStoreKey, byte[]> map, HStoreKey key, 
-      Set<byte []> columns, int numVersions, Map<byte [], Long> deletes,
-      Map<byte [], Cell> results) {
-    if (map.isEmpty() || key == null) {
-      return;
-    }
-    List<HStoreKey> victims = new ArrayList<HStoreKey>();
-    SortedMap<HStoreKey, byte[]> tailMap = map.tailMap(key);
-    long now = System.currentTimeMillis();
-    for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) {
-      HStoreKey itKey = es.getKey();
-      byte [] itCol = itKey.getColumn();
-      Cell cell = results.get(itCol);
-      if ((cell == null || cell.getNumValues() < numVersions) &&
-          key.matchesWithoutColumn(itKey)) {
-        if (columns == null || columns.contains(itKey.getColumn())) {
-          byte [] val = tailMap.get(itKey);
-          if (HLogEdit.isDeleted(val)) {
-            if (!deletes.containsKey(itCol)
-              || deletes.get(itCol).longValue() < itKey.getTimestamp()) {
-              deletes.put(itCol, Long.valueOf(itKey.getTimestamp()));
-            }
-          } else if (!(deletes.containsKey(itCol) 
-              && deletes.get(itCol).longValue() >= itKey.getTimestamp())) {
-            // Skip expired cells
-            if (ttl == HConstants.FOREVER ||
-                  now < itKey.getTimestamp() + ttl) {
-              if (cell == null) {
-                results.put(itCol, new Cell(val, itKey.getTimestamp()));
-              } else {
-                cell.add(val, itKey.getTimestamp());
-              }
-            } else {
-              addVictim(victims, itKey);
-            }
-          }
-        }
-      } else if (this.rawcomparator.compareRows(key.getRow(), itKey.getRow()) < 0) {
+  /*
+   * @param set
+   * @param target Where to start searching.
+   * @param columns
+   * @param versions
+   * @param versionCounter
+   * @param deletes Pass a Set that has a Comparator that ignores key type.
+   * @param keyvalues
+   * @return True if enough results found.
+   */
+  private boolean getFull(final ConcurrentSkipListSet<KeyValue> set,
+      final KeyValue target, final Set<byte []> columns,
+      final Pattern columnPattern,
+      final int versions, final Map<KeyValue, HRegion.Counter> versionCounter,
+      final NavigableSet<KeyValue> deletes, List<KeyValue> keyvalues,
+      final long now) {
+    boolean hasEnough = false;
+    if (target == null) {
+      return hasEnough;
+    }
+    NavigableSet<KeyValue> tailset = set.tailSet(target);
+    if (tailset == null || tailset.isEmpty()) {
+      return hasEnough;
+    }
+    // TODO: This loop same as in HStore.getFullFromStoreFile.  Make sure they
+    // are the same.
+    for (KeyValue kv: tailset) {
+      // Make sure we have not passed out the row.  If target key has a
+      // column on it, then we are looking explicit key+column combination.  If
+      // we've passed it out, also break.
+      if (target.isEmptyColumn()? !this.comparator.matchingRows(target, kv):
+          !this.comparator.matchingRowColumn(target, kv)) {
+        break;
+      }
+      if (!Store.getFullCheck(this.comparator, target, kv, columns, columnPattern)) {
+        continue;
+      }
+      if (Store.doKeyValue(kv, versions, versionCounter, columns, deletes, now,
+          this.ttl, keyvalues, tailset)) {
+        hasEnough = true;
         break;
       }
     }
-    // Remove expired victims from the map.
-    for (HStoreKey v: victims) {
-      map.remove(v);
-    }
+    return hasEnough;
   }
 
   /**
    * @param row Row to look for.
    * @param candidateKeys Map of candidate keys (Accumulation over lots of
    * lookup over stores and memcaches)
-   * @param deletes Deletes collected so far.
    */
-  void getRowKeyAtOrBefore(final byte [] row,
-      final SortedMap<HStoreKey, Long> candidateKeys) {
-    getRowKeyAtOrBefore(row, candidateKeys, new HashSet<HStoreKey>());
+  void getRowKeyAtOrBefore(final KeyValue row,
+      final NavigableSet<KeyValue> candidateKeys) {
+    getRowKeyAtOrBefore(row, candidateKeys,
+      new TreeSet<KeyValue>(this.comparator), System.currentTimeMillis());
   }
-  
+
   /**
-   * @param row Row to look for.
-   * @param candidateKeys Map of candidate keys (Accumulation over lots of
-   * lookup over stores and memcaches)
-   * @param deletes Deletes collected so far.
-   */
-  void getRowKeyAtOrBefore(final byte [] row,
-      final SortedMap<HStoreKey, Long> candidateKeys, 
-      final Set<HStoreKey> deletes) {
+   * @param kv Row to look for.
+   * @param candidates Map of candidate keys (Accumulation over lots of
+   * lookup over stores and memcaches).  Pass a Set with a Comparator that
+   * ignores key Type so we can do Set.remove using a delete, i.e. a KeyValue
+   * with a different Type to the candidate key.
+   * @param deletes Pass a Set that has a Comparator that ignores key type.
+   */
+  void getRowKeyAtOrBefore(final KeyValue kv,
+      final NavigableSet<KeyValue> candidates, 
+      final NavigableSet<KeyValue> deletes, final long now) {
     this.lock.readLock().lock();
     try {
-      synchronized (memcache) {
-        getRowKeyAtOrBefore(memcache, row, candidateKeys, deletes);
-      }
-      synchronized (snapshot) {
-        getRowKeyAtOrBefore(snapshot, row, candidateKeys, deletes);
-      }
+      getRowKeyAtOrBefore(memcache, kv, candidates, deletes, now);
+      getRowKeyAtOrBefore(snapshot, kv, candidates, deletes, now);
     } finally {
       this.lock.readLock().unlock();
     }
   }
 
-  private void getRowKeyAtOrBefore(final SortedMap<HStoreKey, byte []> map,
-      final byte [] row, final SortedMap<HStoreKey, Long> candidateKeys,
-      final Set<HStoreKey> deletes) {
+  private void getRowKeyAtOrBefore(final ConcurrentSkipListSet<KeyValue> set,
+      final KeyValue kv, final NavigableSet<KeyValue> candidates,
+      final NavigableSet<KeyValue> deletes, final long now) {
+    if (set.isEmpty()) {
+      return;
+    }
     // We want the earliest possible to start searching from.  Start before
     // the candidate key in case it turns out a delete came in later.
-    HStoreKey search_key = candidateKeys.isEmpty()?
-     new HStoreKey(row):
-     new HStoreKey(candidateKeys.firstKey().getRow());
-    List<HStoreKey> victims = new ArrayList<HStoreKey>();
-    long now = System.currentTimeMillis();
+    KeyValue search = candidates.isEmpty()? kv: candidates.first();
 
     // Get all the entries that come equal or after our search key
-    SortedMap<HStoreKey, byte []> tailMap = map.tailMap(search_key);
+    SortedSet<KeyValue> tailset = set.tailSet(search);
 
     // if there are items in the tail map, there's either a direct match to
     // the search key, or a range of values between the first candidate key
     // and the ultimate search key (or the end of the cache)
-    if (!tailMap.isEmpty() &&
-        this.rawcomparator.compareRows(tailMap.firstKey().getRow(),
-          search_key.getRow()) <= 0) {
-      Iterator<HStoreKey> key_iterator = tailMap.keySet().iterator();
-
+    if (!tailset.isEmpty() &&
+        this.comparator.compareRows(tailset.first(), search) <= 0) {
       // Keep looking at cells as long as they are no greater than the 
       // ultimate search key and there's still records left in the map.
-      HStoreKey deletedOrExpiredRow = null;
-      for (HStoreKey found_key = null; key_iterator.hasNext() &&
-          (found_key == null ||
-            this.rawcomparator.compareRows(found_key.getRow(), row) <= 0);) {
-        found_key = key_iterator.next();
-        if (this.rawcomparator.compareRows(found_key.getRow(), row) <= 0) {
-          if (HLogEdit.isDeleted(tailMap.get(found_key))) {
-            Store.handleDeleted(found_key, candidateKeys, deletes);
-            if (deletedOrExpiredRow == null) {
-              deletedOrExpiredRow = found_key;
+      KeyValue deleted = null;
+      KeyValue found = null;
+      for (Iterator<KeyValue> iterator = tailset.iterator();
+        iterator.hasNext() && (found == null ||
+          this.comparator.compareRows(found, kv) <= 0);) {
+        found = iterator.next();
+        if (this.comparator.compareRows(found, kv) <= 0) {
+          if (found.isDeleteType()) {
+            Store.handleDeletes(found, candidates, deletes);
+            if (deleted == null) {
+              deleted = found;
             }
           } else {
-            if (Store.notExpiredAndNotInDeletes(this.ttl, 
-                found_key, now, deletes)) {
-              candidateKeys.put(stripTimestamp(found_key),
-                Long.valueOf(found_key.getTimestamp()));
+            if (Store.notExpiredAndNotInDeletes(this.ttl, found, now, deletes)) {
+              candidates.add(found);
             } else {
-              if (deletedOrExpiredRow == null) {
-                deletedOrExpiredRow = new HStoreKey(found_key);
+              if (deleted == null) {
+                deleted = found;
               }
-              addVictim(victims, found_key);
+              // TODO: Check this removes the right key.
+              // Its expired.  Remove it.
+              iterator.remove();
             }
           }
         }
       }
-      if (candidateKeys.isEmpty() && deletedOrExpiredRow != null) {
-        getRowKeyBefore(map, deletedOrExpiredRow, candidateKeys, victims,
-          deletes, now);
+      if (candidates.isEmpty() && deleted != null) {
+        getRowKeyBefore(set, deleted, candidates, deletes, now);
       }
     } else {
       // The tail didn't contain any keys that matched our criteria, or was 
       // empty. Examine all the keys that proceed our splitting point.
-      getRowKeyBefore(map, search_key, candidateKeys, victims, deletes, now);
-    }
-    // Remove expired victims from the map.
-    for (HStoreKey victim: victims) {
-      map.remove(victim);
+      getRowKeyBefore(set, search, candidates, deletes, now);
     }
   }
-  
+
   /*
    * Get row key that comes before passed <code>search_key</code>
    * Use when we know search_key is not in the map and we need to search
    * earlier in the cache.
-   * @param map
-   * @param search_key
-   * @param candidateKeys
-   * @param victims
-   */
-  private void getRowKeyBefore(SortedMap<HStoreKey, byte []> map,
-      HStoreKey search_key, SortedMap<HStoreKey, Long> candidateKeys,
-      final List<HStoreKey> expires, final Set<HStoreKey> deletes,
-      final long now) {
-    SortedMap<HStoreKey, byte []> headMap = map.headMap(search_key);
+   * @param set
+   * @param search
+   * @param candidates
+   * @param deletes Pass a Set that has a Comparator that ignores key type.
+   * @param now
+   */
+  private void getRowKeyBefore(ConcurrentSkipListSet<KeyValue> set,
+      KeyValue search, NavigableSet<KeyValue> candidates,
+      final NavigableSet<KeyValue> deletes, final long now) {
+    NavigableSet<KeyValue> headSet = set.headSet(search);
     // If we tried to create a headMap and got an empty map, then there are
     // no keys at or before the search key, so we're done.
-    if (headMap.isEmpty()) {
+    if (headSet.isEmpty()) {
       return;
     }
 
     // If there aren't any candidate keys at this point, we need to search
     // backwards until we find at least one candidate or run out of headMap.
-    if (candidateKeys.isEmpty()) {
-      Set<HStoreKey> keys = headMap.keySet();
-      HStoreKey [] cells = keys.toArray(new HStoreKey[keys.size()]);
-      byte [] lastRowFound = null;
-      for (int i = cells.length - 1; i >= 0; i--) {
-        HStoreKey found_key = cells[i];
+    if (candidates.isEmpty()) {
+      KeyValue lastFound = null;
+      for (Iterator<KeyValue> i = headSet.descendingIterator(); i.hasNext();) {
+        KeyValue found = i.next();
         // if the last row we found a candidate key for is different than
         // the row of the current candidate, we can stop looking -- if its
         // not a delete record.
-        boolean deleted = HLogEdit.isDeleted(headMap.get(found_key));
-        if (lastRowFound != null &&
-            !HStoreKey.equalsTwoRowKeys(lastRowFound, found_key.getRow()) &&
-            !deleted) {
+        boolean deleted = found.isDeleteType();
+        if (lastFound != null &&
+            this.comparator.matchingRows(lastFound, found) && !deleted) {
           break;
         }
         // If this isn't a delete, record it as a candidate key. Also 
-        // take note of the row of this candidate so that we'll know when
+        // take note of this candidate so that we'll know when
         // we cross the row boundary into the previous row.
         if (!deleted) {
-          if (Store.notExpiredAndNotInDeletes(this.ttl, found_key, now, deletes)) {
-            lastRowFound = found_key.getRow();
-            candidateKeys.put(stripTimestamp(found_key), 
-              Long.valueOf(found_key.getTimestamp()));
+          if (Store.notExpiredAndNotInDeletes(this.ttl, found, now, deletes)) {
+            lastFound = found;
+            candidates.add(found);
           } else {
-            expires.add(found_key);
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("getRowKeyBefore: " + found_key + ": expired, skipped");
-            }
+            // Its expired.
+            Store.expiredOrDeleted(set, found);
           }
         } else {
-          deletes.add(found_key);
+          // We are encountering items in reverse.  We may have just added
+          // an item to candidates that this later item deletes.  Check.  If we
+          // found something in candidates, remove it from the set.
+          if (Store.handleDeletes(found, candidates, deletes)) {
+            remove(set, found);
+          }
         }
       }
     } else {
@@ -540,223 +529,94 @@
       // the very last row's worth of keys in the headMap, because any 
       // smaller acceptable candidate keys would have caused us to start
       // our search earlier in the list, and we wouldn't be searching here.
-      SortedMap<HStoreKey, byte[]> thisRowTailMap = 
-        headMap.tailMap(new HStoreKey(headMap.lastKey().getRow()));
-      Iterator<HStoreKey> key_iterator = thisRowTailMap.keySet().iterator();
+      SortedSet<KeyValue> rowTailMap = 
+        headSet.tailSet(headSet.last().cloneRow(HConstants.LATEST_TIMESTAMP));
+      Iterator<KeyValue> i = rowTailMap.iterator();
       do {
-        HStoreKey found_key = key_iterator.next();
-        if (HLogEdit.isDeleted(thisRowTailMap.get(found_key))) {
-          Store.handleDeleted(found_key, candidateKeys, deletes);
+        KeyValue found = i.next();
+        if (found.isDeleteType()) {
+          Store.handleDeletes(found, candidates, deletes);
         } else {
           if (ttl == HConstants.FOREVER ||
-              now < found_key.getTimestamp() + ttl ||
-              !deletes.contains(found_key)) {
-            candidateKeys.put(stripTimestamp(found_key), 
-              Long.valueOf(found_key.getTimestamp()));
+              now < found.getTimestamp() + ttl ||
+              !deletes.contains(found)) {
+            candidates.add(found);
           } else {
-            expires.add(found_key);
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("internalGetRowKeyAtOrBefore: " + found_key +
-                ": expired, skipped");
-            }
+            Store.expiredOrDeleted(set, found);
           }
         }
-      } while (key_iterator.hasNext());
+      } while (i.hasNext());
     }
   }
-  
-  static HStoreKey stripTimestamp(HStoreKey key) {
-    return new HStoreKey(key.getRow(), key.getColumn());
-  }
-  
+
   /*
    * Examine a single map for the desired key.
    *
    * TODO - This is kinda slow.  We need a data structure that allows for 
    * proximity-searches, not just precise-matches.
    * 
-   * @param map
+   * @param set
    * @param key
-   * @param numVersions
-   * @param deletes
-   * @return Ordered list of items found in passed <code>map</code>.  If no
-   * matching values, returns an empty list (does not return null).
+   * @param results
+   * @param versions
+   * @param keyvalues
+   * @param deletes Pass a Set that has a Comparator that ignores key type.
+   * @param now
+   * @return True if enough versions.
    */
-  private ArrayList<Cell> get(final SortedMap<HStoreKey, byte []> map,
-      final HStoreKey key, final int numVersions, final Set<HStoreKey> deletes,
+  private boolean get(final ConcurrentSkipListSet<KeyValue> set,
+      final KeyValue key, final int versions,
+      final List<KeyValue> keyvalues,
+      final NavigableSet<KeyValue> deletes,
       final long now) {
-    ArrayList<Cell> result = new ArrayList<Cell>();
-    List<HStoreKey> victims = new ArrayList<HStoreKey>();
-    SortedMap<HStoreKey, byte[]> tailMap = map.tailMap(key);
-    for (Map.Entry<HStoreKey, byte[]> es : tailMap.entrySet()) {
-      HStoreKey itKey = es.getKey();
-      if (itKey.matchesRowCol(key)) {
-        if (!isDeleted(es.getValue())) {
-          // Filter out expired results
-          if (Store.notExpiredAndNotInDeletes(ttl, itKey, now, deletes)) {
-            result.add(new Cell(tailMap.get(itKey), itKey.getTimestamp()));
-            if (numVersions > 0 && result.size() >= numVersions) {
-              break;
-            }
-          } else {
-            addVictim(victims, itKey);
-          }
-        } else {
-          // Cell holds a delete value.
-          deletes.add(itKey);
+    NavigableSet<KeyValue> tailset = set.tailSet(key);
+    if (tailset.isEmpty()) {
+      return false;
+    }
+    boolean enoughVersions = false;
+    for (KeyValue kv : tailset) {
+      if (this.comparator.matchingRowColumn(kv, key)) {
+        if (Store.doKeyValue(kv, versions, deletes, now, this.ttl, keyvalues,
+            tailset)) {
+          break;
         }
       } else {
         // By L.N. HBASE-684, map is sorted, so we can't find match any more.
         break;
       }
     }
-    // Remove expired victims from the map.
-    for (HStoreKey v: victims) {
-      map.remove(v);
-    }
-    return result;
-  }
-
-  /*
-   * Add <code>key</code> to the list of 'victims'.
-   * @param victims
-   * @param key
-   */
-  private void addVictim(final List<HStoreKey> victims, final HStoreKey key) {
-    victims.add(key);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(key + ": expired or in deletes, skipped");
-    }
-  }
-
-  /**
-   * Get <code>versions</code> keys matching the origin key's
-   * row/column/timestamp and those of an older vintage.
-   * @param origin Where to start searching.
-   * @param versions How many versions to return. Pass
-   * {@link HConstants.ALL_VERSIONS} to retrieve all.
-   * @param now
-   * @param deletes Accumulating list of deletes
-   * @param columnPattern regex pattern for column matching. if columnPattern
-   * is not null, we use column pattern to match columns. And the columnPattern
-   * only works when origin's column is null or its length is zero.
-   * @return Ordered list of <code>versions</code> keys going from newest back.
-   * @throws IOException
-   */
-  List<HStoreKey> getKeys(final HStoreKey origin, final int versions,
-      final Set<HStoreKey> deletes, final long now, 
-      final Pattern columnPattern) {
-    this.lock.readLock().lock();
-    try {
-      List<HStoreKey> results;
-      synchronized (memcache) {
-        results = 
-          getKeys(this.memcache, origin, versions, deletes, now, columnPattern);
-      }
-      synchronized (snapshot) {
-        results.addAll(results.size(), getKeys(snapshot, origin,
-            versions == HConstants.ALL_VERSIONS ? versions :
-              (versions - results.size()), deletes, now, columnPattern));
-      }
-      return results;
-    } finally {
-      this.lock.readLock().unlock();
-    }
+    return enoughVersions;
   }
 
   /*
-   * @param origin Where to start searching.
-   * @param versions How many versions to return. Pass
-   * {@link HConstants.ALL_VERSIONS} to retrieve all.
-   * @param now
-   * @param deletes
-   * @param columnPattern regex pattern for column matching. if columnPattern
-   * is not null, we use column pattern to match columns. And the columnPattern
-   * only works when origin's column is null or its length is zero.
-   * @return List of all keys that are of the same row and column and of
-   * equal or older timestamp.  If no keys, returns an empty List. Does not
-   * return null.
-   */
-  private List<HStoreKey> getKeys(final SortedMap<HStoreKey,
-      byte []> map, final HStoreKey origin, final int versions,
-      final Set<HStoreKey> deletes, final long now, 
-      final Pattern columnPattern) {
-    List<HStoreKey> result = new ArrayList<HStoreKey>();
-    List<HStoreKey> victims = new ArrayList<HStoreKey>();
-    SortedMap<HStoreKey, byte []> tailMap = map.tailMap(origin);
-    for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) {
-      HStoreKey key = es.getKey();
-      // if there's no column name, then compare rows and timestamps
-      if (origin.getColumn() != null && origin.getColumn().length == 0) {
-        // if the current and origin row don't match, then we can jump
-        // out of the loop entirely.
-        if (!HStoreKey.equalsTwoRowKeys( key.getRow(), origin.getRow())) {
-          break;
-        }
-        // if the column pattern is not null, we use it for column matching.
-        // we will skip the keys whose column doesn't match the pattern.
-        if (columnPattern != null) {
-          if (!(columnPattern.matcher(Bytes.toString(key.getColumn())).matches())) {
-            continue;
-          }
-        }
-        // if the rows match but the timestamp is newer, skip it so we can
-        // get to the ones we actually want.
-        if (key.getTimestamp() > origin.getTimestamp()) {
-          continue;
-        }
-      } else { // compare rows and columns
-        // if the key doesn't match the row and column, then we're done, since 
-        // all the cells are ordered.
-        if (!key.matchesRowCol(origin)) {
-          break;
-        }
-      }
-      if (!isDeleted(es.getValue())) {
-        if (Store.notExpiredAndNotInDeletes(this.ttl, key, now, deletes)) {
-          result.add(key);
-          if (versions > 0 && result.size() >= versions) {
-            break;
-          }
-        } else {
-          addVictim(victims, key);
-        }
-      } else {
-        // Delete
-        deletes.add(key);
+   * @param set
+   * @param kv This is a delete record.  Remove anything behind this of same
+   * r/c/ts.
+   * @return True if we removed anything.
+   */
+  private boolean remove(final NavigableSet<KeyValue> set, final KeyValue kv) {
+    SortedSet<KeyValue> s = set.tailSet(kv);
+    if (s.isEmpty()) {
+      return false;
+    }
+    boolean removed = false;
+    for (KeyValue k: s) {
+      if (this.comparatorIgnoreType.compare(k, kv) == 0) {
+        // Same r/c/ts.  Remove it.
+        s.remove(k);
+        removed = true;
+        continue;
       }
+      break;
     }
-    // Clean expired victims from the map.
-    for (HStoreKey v: victims) {
-       map.remove(v);
-    }
-    return result;
-  }
-
-  /**
-   * @param key
-   * @return True if an entry and its content is {@link HGlobals.deleteBytes}.
-   * Use checking values in store. On occasion the memcache has the fact that
-   * the cell has been deleted.
-   */
-  boolean isDeleted(final HStoreKey key) {
-    return isDeleted(this.memcache.get(key)) ||
-      (this.snapshot != null && isDeleted(this.snapshot.get(key)));
-  }
-  
-  /*
-   * @param b Cell value.
-   * @return True if this is a delete value.
-   */
-  private boolean isDeleted(final byte [] b) {
-    return HLogEdit.isDeleted(b);
+    return removed;
   }
 
   /**
    * @return a scanner over the keys in the Memcache
    */
   InternalScanner getScanner(long timestamp,
-    final byte [][] targetCols, final byte [] firstRow)
+    final NavigableSet<byte []> targetCols, final byte [] firstRow)
   throws IOException {
     this.lock.readLock().lock();
     try {
@@ -772,89 +632,71 @@
   //////////////////////////////////////////////////////////////////////////////
 
   private class MemcacheScanner extends HAbstractScanner {
-    private byte [] currentRow;
-    private Set<byte []> columns = null;
+    private KeyValue current;
+    private final NavigableSet<byte []> columns;
+    private final NavigableSet<KeyValue> deletes;
+    private final Map<KeyValue, Counter> versionCounter;
+    private final long now = System.currentTimeMillis();
 
-    MemcacheScanner(final long timestamp, final byte [] targetCols[],
+    MemcacheScanner(final long timestamp, final NavigableSet<byte []> columns,
       final byte [] firstRow)
     throws IOException {
       // Call to super will create ColumnMatchers and whether this is a regex
       // scanner or not.  Will also save away timestamp.  Also sorts rows.
-      super(timestamp, targetCols);
-      this.currentRow = firstRow;
+      super(timestamp, columns);
+      this.deletes = new TreeSet<KeyValue>(comparatorIgnoreType);
+      this.versionCounter =
+        new TreeMap<KeyValue, Counter>(comparatorIgnoreTimestamp);
+      this.current = KeyValue.createFirstOnRow(firstRow, timestamp);
       // If we're being asked to scan explicit columns rather than all in 
       // a family or columns that match regexes, cache the sorted array of
       // columns.
-      this.columns = null;
-      if (!isWildcardScanner()) {
-        this.columns = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
-        for (int i = 0; i < targetCols.length; i++) {
-          this.columns.add(targetCols[i]);
-        }
-      }
+      this.columns = isWildcardScanner()? null: columns;
     }
 
     @Override
-    public boolean next(HStoreKey key, SortedMap<byte [], Cell> results)
+    public boolean next(final List<KeyValue> keyvalues)
     throws IOException {
       if (this.scannerClosed) {
         return false;
       }
-      // This is a treemap rather than a Hashmap because then I can have a
-      // byte array as key -- because I can independently specify a comparator.
-      Map<byte [], Long> deletes =
-        new TreeMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
-      // Catch all row results in here.  These results are ten filtered to
-      // ensure they match column name regexes, or if none, added to results.
-      Map<byte [], Cell> rowResults =
-        new TreeMap<byte [], Cell>(Bytes.BYTES_COMPARATOR);
-      if (results.size() > 0) {
-        results.clear();
-      }
-      long latestTimestamp = -1;
-      while (results.size() <= 0 && this.currentRow != null) {
-        if (deletes.size() > 0) {
+      while (keyvalues.isEmpty() && this.current != null) {
+        // Deletes are per row.
+        if (!deletes.isEmpty()) {
           deletes.clear();
         }
-        if (rowResults.size() > 0) {
-          rowResults.clear();
-        }
-        key.setRow(this.currentRow);
-        key.setVersion(this.timestamp);
-        getFull(key, isWildcardScanner() ? null : this.columns, 1, deletes,
-            rowResults);
-        for (Map.Entry<byte [], Long> e: deletes.entrySet()) {
-          rowResults.put(e.getKey(),
-            new Cell(HLogEdit.DELETED_BYTES, e.getValue().longValue()));
+        if (!versionCounter.isEmpty()) {
+          versionCounter.clear();
         }
-        for (Map.Entry<byte [], Cell> e: rowResults.entrySet()) {
-          byte [] column = e.getKey();
-          Cell c = e.getValue();
+        // The getFull will take care of expired and deletes inside memcache.
+        // The first getFull when row is the special empty bytes will return
+        // nothing so we go around again.  Alternative is calling a getNextRow
+        // if row is null but that looks like it would take same amount of work
+        // so leave it for now.
+        getFull(this.current, isWildcardScanner()? null: this.columns, null, 1,
+          versionCounter, deletes, keyvalues, this.now);
+        for (KeyValue bb: keyvalues) {
           if (isWildcardScanner()) {
             // Check the results match.  We only check columns, not timestamps.
             // We presume that timestamps have been handled properly when we
             // called getFull.
-            if (!columnMatch(column)) {
-              continue;
+            if (!columnMatch(bb)) {
+              keyvalues.remove(bb);
             }
           }
-          // We should never return HConstants.LATEST_TIMESTAMP as the time for
-          // the row. As a compromise, we return the largest timestamp for the
-          // entries that we find that match.
-          if (c.getTimestamp() != HConstants.LATEST_TIMESTAMP &&
-              c.getTimestamp() > latestTimestamp) {
-            latestTimestamp = c.getTimestamp();
-          }
-          results.put(column, c);
         }
-        this.currentRow = getNextRow(this.currentRow);
-      }
-      // Set the timestamp to the largest one for the row if we would otherwise
-      // return HConstants.LATEST_TIMESTAMP
-      if (key.getTimestamp() == HConstants.LATEST_TIMESTAMP) {
-        key.setVersion(latestTimestamp);
+        // Add any deletes found so they are available to the StoreScanner#next.
+        if (!this.deletes.isEmpty()) {
+          keyvalues.addAll(deletes);
+        }
+        this.current = getNextRow(this.current);
+        // Change current to be column-less and to have the scanners' now.  We
+        // do this because first item on 'next row' may not have the scanners'
+        // now time which will cause trouble down in getFull; same reason no
+        // column.
+        if (this.current != null) this.current = this.current.cloneRow(this.now);
       }
-      return results.size() > 0;
+      return !keyvalues.isEmpty();
     }
 
     public void close() {
@@ -871,8 +713,10 @@
    * allows you get 'deep size' on objects.
    * @param args
    * @throws InterruptedException
+   * @throws IOException 
    */
-  public static void main(String [] args) throws InterruptedException {
+  public static void main(String [] args)
+  throws InterruptedException, IOException {
     RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
     LOG.info("vmName=" + runtime.getVmName() + ", vmVendor=" +
       runtime.getVmVendor() + ", vmVersion=" + runtime.getVmVersion());
@@ -881,28 +725,27 @@
     // TODO: x32 vs x64
     long size = 0;
     final int count = 10000;
+    byte [] column = Bytes.toBytes("col:umn");
     for (int i = 0; i < count; i++) {
-      size += memcache1.add(new HStoreKey(Bytes.toBytes(i)),
-        HConstants.EMPTY_BYTE_ARRAY);
+      // Give each its own ts
+      size += memcache1.add(new KeyValue(Bytes.toBytes(i), column, i));
     }
     LOG.info("memcache1 estimated size=" + size);
     for (int i = 0; i < count; i++) {
-      size += memcache1.add(new HStoreKey(Bytes.toBytes(i)),
-        HConstants.EMPTY_BYTE_ARRAY);
+      size += memcache1.add(new KeyValue(Bytes.toBytes(i), column, i));
     }
     LOG.info("memcache1 estimated size (2nd loading of same data)=" + size);
     // Make a variably sized memcache.
     Memcache memcache2 = new Memcache();
     for (int i = 0; i < count; i++) {
-      byte [] b = Bytes.toBytes(i);
-      size += memcache2.add(new HStoreKey(b, b),
-        new byte [i]);
+      size += memcache2.add(new KeyValue(Bytes.toBytes(i), column, i,
+        new byte[i]));
     }
     LOG.info("memcache2 estimated size=" + size);
     final int seconds = 30;
     LOG.info("Waiting " + seconds + " seconds while heap dump is taken");
     for (int i = 0; i < seconds; i++) {
-      Thread.sleep(1000);
+      // Thread.sleep(1000);
     }
     LOG.info("Exiting.");
   }