You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2009/04/12 12:39:58 UTC

svn commit: r764289 [5/8] - in /hadoop/hbase/trunk: ./ src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/client/ src/java/org/apache/hadoop/hbase/filter/ src/java/org/apache/hadoop/hbase/io/ src/java/org/apache/hadoop/hbase/io/hfile/ s...

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=764289&r1=764288&r2=764289&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java Sun Apr 12 10:39:55 2009
@@ -22,21 +22,20 @@
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
+import java.util.NavigableSet;
 import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
+import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.regex.Pattern;
@@ -50,17 +49,16 @@
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.filter.RowFilterInterface;
-import org.apache.hadoop.hbase.io.Cell;
 import org.apache.hadoop.hbase.io.SequenceFile;
 import org.apache.hadoop.hbase.io.hfile.Compression;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.regionserver.HRegion.Counter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.StringUtils;
 
@@ -89,6 +87,12 @@
  */
 public class Store implements HConstants {
   static final Log LOG = LogFactory.getLog(Store.class);
+  /**
+   * Comparator that looks at columns and compares their family portions.
+   * Presumes columns have already been checked for presence of delimiter.
+   * If no delimiter present, presume the buffer holds a store name so no need
+   * of a delimiter.
+   */
   protected final Memcache memcache;
   // This stores directory in the filesystem.
   private final Path homedir;
@@ -132,10 +136,9 @@
   private final boolean bloomfilter;
   private final Compression.Algorithm compression;
   
-  // Comparing HStoreKeys in byte arrays.
-  final HStoreKey.StoreKeyComparator rawcomparator;
-  // Comparing HStoreKey objects.
-  final Comparator<HStoreKey> comparator;
+  // Comparing KeyValues
+  final KeyValue.KVComparator comparator;
+  final KeyValue.KVComparator comparatorIgnoringType;
 
   /**
    * Constructor
@@ -151,7 +154,6 @@
    * failed.  Can be null.
    * @throws IOException
    */
-  @SuppressWarnings("unchecked")
   protected Store(Path basedir, HRegionInfo info, HColumnDescriptor family,
     FileSystem fs, Path reconstructionLog, HBaseConfiguration conf,
     final Progressable reporter)
@@ -165,23 +167,16 @@
     this.bloomfilter = family.isBloomfilter();
     this.blocksize = family.getBlocksize();
     this.compression = family.getCompression();
-    this.rawcomparator = info.isRootRegion()?
-      new HStoreKey.RootStoreKeyComparator(): info.isMetaRegion()?
-        new HStoreKey.MetaStoreKeyComparator():
-          new HStoreKey.StoreKeyComparator();
-    this.comparator = info.isRootRegion()?
-      new HStoreKey.HStoreKeyRootComparator(): info.isMetaRegion()?
-        new HStoreKey.HStoreKeyMetaComparator():
-          new HStoreKey.HStoreKeyComparator();
+    this.comparator = info.getComparator();
+    this.comparatorIgnoringType = this.comparator.getComparatorIgnoringType();
     // getTimeToLive returns ttl in seconds.  Convert to milliseconds.
     this.ttl = family.getTimeToLive();
     if (ttl != HConstants.FOREVER) {
       this.ttl *= 1000;
     }
-    this.memcache = new Memcache(this.ttl, this.comparator, this.rawcomparator);
+    this.memcache = new Memcache(this.ttl, this.comparator);
     this.compactionDir = HRegion.getCompactionDir(basedir);
-    this.storeName = Bytes.toBytes(this.regioninfo.getEncodedName() + "/" +
-      Bytes.toString(this.family.getName()));
+    this.storeName = this.family.getName();
     this.storeNameStr = Bytes.toString(this.storeName);
 
     // By default, we compact if an HStore has more than
@@ -293,17 +288,10 @@
     // TODO: This could grow large and blow heap out.  Need to get it into
     // general memory usage accounting.
     long maxSeqIdInLog = -1;
-    NavigableMap<HStoreKey, byte []> reconstructedCache =
-      new TreeMap<HStoreKey, byte []>(this.comparator);
-    SequenceFile.Reader logReader = null;
-    try {
-      logReader = new SequenceFile.Reader(this.fs, reconstructionLog, this.conf);
-    } catch (IOException e) {
-      LOG.warn("Failed opening reconstruction log though check for null-size passed. " +
-        "POSSIBLE DATA LOSS!! Soldiering on", e);
-      return;
-    }
-
+    ConcurrentSkipListSet<KeyValue> reconstructedCache =
+      Memcache.createSet(this.comparator);
+    SequenceFile.Reader logReader = new SequenceFile.Reader(this.fs,
+      reconstructionLog, this.conf);
     try {
       HLogKey key = new HLogKey();
       HLogEdit val = new HLogEdit();
@@ -318,16 +306,15 @@
           skippedEdits++;
           continue;
         }
-        // Check this edit is for me. Also, guard against writing
+        // Check this edit is for me. Also, guard against writing the speical
         // METACOLUMN info such as HBASE::CACHEFLUSH entries
-        byte [] column = val.getColumn();
-        if (val.isTransactionEntry() || Bytes.equals(column, HLog.METACOLUMN)
-            || !Bytes.equals(key.getRegionName(), regioninfo.getRegionName())
-            || !HStoreKey.matchingFamily(family.getName(), column)) {
+        KeyValue kv = val.getKeyValue();
+        if (val.isTransactionEntry() || kv.matchingColumnNoDelimiter(HLog.METACOLUMN) ||
+          !Bytes.equals(key.getRegionName(), regioninfo.getRegionName()) ||
+          !kv.matchingFamily(family.getName())) {
           continue;
         }
-        HStoreKey k = new HStoreKey(key.getRow(), column, val.getTimestamp());
-        reconstructedCache.put(k, val.getVal());
+        reconstructedCache.add(kv);
         editsCount++;
         // Every 2k edits, tell the reporter we're making progress.
         // Have seen 60k edits taking 3minutes to complete.
@@ -393,14 +380,13 @@
   /**
    * Adds a value to the memcache
    * 
-   * @param key
-   * @param value
+   * @param kv
    * @return memcache size delta
    */
-  protected long add(HStoreKey key, byte[] value) {
+  protected long add(final KeyValue kv) {
     lock.readLock().lock();
     try {
-      return this.memcache.add(key, value);
+      return this.memcache.add(kv);
     } finally {
       lock.readLock().unlock();
     }
@@ -456,7 +442,7 @@
   boolean flushCache(final long logCacheFlushId) throws IOException {
     // Get the snapshot to flush.  Presumes that a call to
     // this.memcache.snapshot() has happened earlier up in the chain.
-    SortedMap<HStoreKey, byte []> cache = this.memcache.getSnapshot();
+    ConcurrentSkipListSet<KeyValue> cache = this.memcache.getSnapshot();
     // If an exception happens flushing, we let it out without clearing
     // the memcache snapshot.  The old snapshot will be returned when we say
     // 'snapshot', the next time flush comes around.
@@ -476,7 +462,7 @@
    * @return StoreFile created.
    * @throws IOException
    */
-  private StoreFile internalFlushCache(final SortedMap<HStoreKey, byte []> cache,
+  private StoreFile internalFlushCache(final ConcurrentSkipListSet<KeyValue> cache,
     final long logCacheFlushId)
   throws IOException {
     HFile.Writer writer = null;
@@ -494,13 +480,11 @@
       writer = getWriter();
       int entries = 0;
       try {
-        for (Map.Entry<HStoreKey, byte []> es: cache.entrySet()) {
-          HStoreKey curkey = es.getKey();
-          byte[] bytes = es.getValue();
-          if (!isExpired(curkey, ttl, now)) {
-            writer.append(curkey.getBytes(), bytes);
+        for (KeyValue kv: cache) {
+          if (!isExpired(kv, ttl, now)) {
+            writer.append(kv);
             entries++;
-            flushed += this.memcache.heapSize(curkey, bytes, null);
+            flushed += this.memcache.heapSize(kv, true);
           }
         }
         // B. Write out the log sequence number that corresponds to this output
@@ -528,7 +512,7 @@
    */
   HFile.Writer getWriter() throws IOException {
     return StoreFile.getWriter(this.fs, this.homedir, this.blocksize,
-        this.compression, this.rawcomparator, this.bloomfilter);
+        this.compression, this.comparator.getRawComparator(), this.bloomfilter);
   }
 
   /*
@@ -540,7 +524,7 @@
    * @return Count of store files.
    */
   private int updateStorefiles(final long logCacheFlushId,
-    final StoreFile sf, final SortedMap<HStoreKey, byte []> cache)
+    final StoreFile sf, final NavigableSet<KeyValue> cache)
   throws IOException {
     int count = 0;
     this.lock.writeLock().lock();
@@ -812,7 +796,7 @@
    * @param done Which readers are done
    * @return The lowest current key in passed <code>rdrs</code>
    */
-  private int getLowestKey(final HFileScanner [] rdrs, final ByteBuffer [] keys,
+  private int getLowestKey(final HFileScanner [] rdrs, final KeyValue [] keys,
       final boolean [] done) {
     int lowestKey = -1;
     for (int i = 0; i < rdrs.length; i++) {
@@ -822,10 +806,7 @@
       if (lowestKey < 0) {
         lowestKey = i;
       } else {
-        RawComparator<byte []> c = rdrs[i].getReader().getComparator();
-        if (c.compare(keys[i].array(), keys[i].arrayOffset(), keys[i].limit(),
-            keys[lowestKey].array(), keys[lowestKey].arrayOffset(),
-              keys[lowestKey].limit()) < 0) {
+        if (this.comparator.compare(keys[i], keys[lowestKey]) < 0) {
           lowestKey = i;
         }
       }
@@ -850,9 +831,8 @@
   throws IOException {
     // Reverse order so newest store file is first.
     StoreFile[] files = reverse(pReaders);
-    HFileScanner[] rdrs = new HFileScanner[files.length];
-    ByteBuffer[] keys = new ByteBuffer[rdrs.length];
-    ByteBuffer[] vals = new ByteBuffer[rdrs.length];
+    HFileScanner [] rdrs = new HFileScanner[files.length];
+    KeyValue [] kvs = new KeyValue[rdrs.length];
     boolean[] done = new boolean[rdrs.length];
     // Now, advance through the readers in order. This will have the
     // effect of a run-time sort of the entire dataset.
@@ -863,29 +843,26 @@
       if (done[i]) {
         numDone++;
       } else {
-        keys[i] = rdrs[i].getKey();
-        vals[i] = rdrs[i].getValue();
+        kvs[i] = rdrs[i].getKeyValue();
       }
     }
 
     long now = System.currentTimeMillis();
     int timesSeen = 0;
-    HStoreKey lastSeen = new HStoreKey();
-    HStoreKey lastDelete = null;
+    KeyValue lastSeen = KeyValue.LOWESTKEY;
+    KeyValue lastDelete = null;
+    int maxVersions = family.getMaxVersions();
     while (numDone < done.length) {
       // Get lowest key in all store files.
-      int lowestKey = getLowestKey(rdrs, keys, done);
-      // TODO: Suboptimal. And below where we are going from ByteBuffer to
-      // byte array. FIX!! Can we get rid of HSK instantiations?
-      HStoreKey hsk = HStoreKey.create(keys[lowestKey]);
+      int lowestKey = getLowestKey(rdrs, kvs, done);
+      KeyValue kv = kvs[lowestKey];
       // If its same row and column as last key, increment times seen.
-      if (HStoreKey.equalsTwoRowKeys(lastSeen.getRow(), hsk.getRow())
-          && Bytes.equals(lastSeen.getColumn(), hsk.getColumn())) {
+      if (this.comparator.matchingRowColumn(lastSeen, kv)) {
         timesSeen++;
         // Reset last delete if not exact timestamp -- lastDelete only stops
         // exactly the same key making it out to the compacted store file.
-        if (lastDelete != null
-            && lastDelete.getTimestamp() != hsk.getTimestamp()) {
+        if (lastDelete != null &&
+            lastDelete.getTimestamp() != kv.getTimestamp()) {
           lastDelete = null;
         }
       } else {
@@ -894,26 +871,25 @@
       }
 
       // Don't write empty rows or columns. Only remove cells on major
-      // compaction. Remove if expired of > VERSIONS
-      if (hsk.getRow().length != 0 && hsk.getColumn().length != 0) {
-        ByteBuffer value = vals[lowestKey];
+      // compaction. Remove if expired or > VERSIONS
+      if (kv.nonNullRowAndColumn()) {
         if (!majorCompaction) {
           // Write out all values if not a major compaction.
-          compactedOut.append(hsk.getBytes(), Bytes.toBytes(value));
+          compactedOut.append(kv);
         } else {
           boolean expired = false;
           boolean deleted = false;
-          if (timesSeen <= family.getMaxVersions()
-              && !(expired = isExpired(hsk, ttl, now))) {
+          if (timesSeen <= maxVersions && !(expired = isExpired(kv, ttl, now))) {
             // If this value key is same as a deleted key, skip
-            if (lastDelete != null && hsk.equals(lastDelete)) {
+            if (lastDelete != null &&
+                this.comparator.compare(kv, lastDelete) == 0) {
               deleted = true;
-            } else if (HLogEdit.isDeleted(value)) {
+            } else if (kv.isDeleteType()) {
               // If a deleted value, skip
               deleted = true;
-              lastDelete = hsk;
+              lastDelete = kv;
             } else {
-              compactedOut.append(hsk.getBytes(), Bytes.toBytes(value));
+              compactedOut.append(kv);
             }
           }
           if (expired || deleted) {
@@ -925,7 +901,7 @@
       }
 
       // Update last-seen items
-      lastSeen = hsk;
+      lastSeen = kv;
 
       // Advance the smallest key. If that reader's all finished, then
       // mark it as done.
@@ -934,8 +910,7 @@
         rdrs[lowestKey] = null;
         numDone++;
       } else {
-        keys[lowestKey] = rdrs[lowestKey].getKey();
-        vals[lowestKey] = rdrs[lowestKey].getValue();
+        kvs[lowestKey] = rdrs[lowestKey].getKeyValue();
       }
     }
   }
@@ -1024,101 +999,138 @@
    * row and timestamp, but not a column name.
    *
    * The returned object should map column names to Cells.
+   * @param origin Where to start searching.  Specifies a row and timestamp.
+   * Columns are specified in following arguments.
+   * @param columns Can be null which means get all
+   * @param columnPattern Can be null.
+   * @param numVersions
+   * @param versionsCounter Can be null.
+   * @param keyvalues
+   * @throws IOException
    */
-  void getFull(HStoreKey key, final Set<byte []> columns,
-      final int numVersions, Map<byte [], Cell> results)
+  public void getFull(KeyValue key, final NavigableSet<byte []> columns,
+      final Pattern columnPattern,
+      final int numVersions, Map<KeyValue, HRegion.Counter> versionsCounter,
+      List<KeyValue> keyvalues, final long now)
   throws IOException {
-    int versions = versionsToReturn(numVersions);
-    // This is map of columns to timestamp
-    Map<byte [], Long> deletes =
-      new TreeMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
     // if the key is null, we're not even looking for anything. return.
     if (key == null) {
       return;
     }
-    
+    int versions = versionsToReturn(numVersions);
+    NavigableSet<KeyValue> deletes =
+      new TreeSet<KeyValue>(this.comparatorIgnoringType);
+    // Create a Map that has results by column so we can keep count of versions.
+    // It duplicates columns but doing check of columns, we don't want to make
+    // column set each time.
     this.lock.readLock().lock();
-    // get from the memcache first.
-    this.memcache.getFull(key, columns, versions, deletes, results);
     try {
+      // get from the memcache first.
+      if (this.memcache.getFull(key, columns, columnPattern, versions,
+          versionsCounter, deletes, keyvalues, now)) {
+        // May have gotten enough results, enough to return.
+        return;
+      }
       Map<Long, StoreFile> m = this.storefiles.descendingMap();
       for (Iterator<Map.Entry<Long, StoreFile>> i = m.entrySet().iterator();
           i.hasNext();) {
-        getFullFromStoreFile(i.next().getValue(), key, columns, versions, deletes, results);
+        if (getFullFromStoreFile(i.next().getValue(), key, columns,
+            columnPattern, versions, versionsCounter, deletes, keyvalues)) {
+          return;
+        }
       }
     } finally {
       this.lock.readLock().unlock();
     }
   }
 
-  private void getFullFromStoreFile(StoreFile f, HStoreKey key, 
-    Set<byte []> columns, int numVersions, Map<byte [], Long> deletes,
-    Map<byte [], Cell> results) 
+  /*
+   * @param f
+   * @param key Where to start searching.  Specifies a row and timestamp.
+   * Columns are specified in following arguments.
+   * @param columns
+   * @param versions
+   * @param versionCounter
+   * @param deletes
+   * @param keyvalues
+   * @return True if we found enough results to satisfy the <code>versions</code>
+   * and <code>columns</code> passed.
+   * @throws IOException
+   */
+  private boolean getFullFromStoreFile(StoreFile f, KeyValue target, 
+    Set<byte []> columns, final Pattern columnPattern, int versions, 
+    Map<KeyValue, HRegion.Counter> versionCounter,
+    NavigableSet<KeyValue> deletes,
+    List<KeyValue> keyvalues) 
   throws IOException {
     long now = System.currentTimeMillis();
     HFileScanner scanner = f.getReader().getScanner();
-    if (!getClosest(scanner, key.getBytes())) {
-      return;
+    if (!getClosest(scanner, target)) {
+      return false;
     }
+    boolean hasEnough = false;
     do {
-      HStoreKey readkey = HStoreKey.create(scanner.getKey());
-      byte[] readcol = readkey.getColumn();
-
-      // if we're looking for this column (or all of them), and there isn't
-      // already a value for this column in the results map or there is a value
-      // but we haven't collected enough versions yet, and the key we
-      // just read matches, then we'll consider it
-      if ((columns == null || columns.contains(readcol)) &&
-          (!results.containsKey(readcol) ||
-            results.get(readcol).getNumValues() < numVersions) &&
-          key.matchesWithoutColumn(readkey)) {
-        // if the value of the cell we're looking at right now is a delete,
-        // we need to treat it differently
-        ByteBuffer value = scanner.getValue();
-        if (HLogEdit.isDeleted(value)) {
-          // if it's not already recorded as a delete or recorded with a more
-          // recent delete timestamp, record it for later
-          if (!deletes.containsKey(readcol)
-              || deletes.get(readcol).longValue() < readkey.getTimestamp()) {
-            deletes.put(readcol, Long.valueOf(readkey.getTimestamp()));
-          }
-        } else if (!(deletes.containsKey(readcol) && deletes.get(readcol)
-            .longValue() >= readkey.getTimestamp())) {
-          // So the cell itself isn't a delete, but there may be a delete
-          // pending from earlier in our search. Only record this result if
-          // there aren't any pending deletes.
-          if (!(deletes.containsKey(readcol) && deletes.get(readcol)
-              .longValue() >= readkey.getTimestamp())) {
-            if (!isExpired(readkey, ttl, now)) {
-              if (!results.containsKey(readcol)) {
-                results.put(readcol, new Cell(value, readkey.getTimestamp()));
-              } else {
-                results.get(readcol).add(Bytes.toBytes(value),
-                    readkey.getTimestamp());
-              }
-            }
-          }
-        }
-      } else if (this.rawcomparator.compareRows(key.getRow(), 0,
-            key.getRow().length,
-          readkey.getRow(), 0, readkey.getRow().length) < 0) {
-        // if we've crossed into the next row, then we can just stop
-        // iterating
+      KeyValue kv = scanner.getKeyValue();
+      // Make sure we have not passed out the row.  If target key has a
+      // column on it, then we are looking explicit key+column combination.  If
+      // we've passed it out, also break.
+      if (target.isEmptyColumn()? !this.comparator.matchingRows(target, kv):
+          !this.comparator.matchingRowColumn(target, kv)) {
+        break;
+      }
+      if (!Store.getFullCheck(this.comparator, target, kv, columns, columnPattern)) {
+        continue;
+      }
+      if (Store.doKeyValue(kv, versions, versionCounter, columns, deletes, now,
+          this.ttl, keyvalues, null)) {
+        hasEnough = true;
         break;
       }
     } while (scanner.next());
+    return hasEnough;
+  }
+
+  /**
+   * Code shared by {@link Memcache#getFull(KeyValue, NavigableSet, Pattern, int, Map, NavigableSet, List, long)}
+   * and {@link #getFullFromStoreFile(StoreFile, KeyValue, Set, Pattern, int, Map, NavigableSet, List)}
+   * @param c
+   * @param target
+   * @param candidate
+   * @param columns
+   * @param columnPattern
+   * @return True if <code>candidate</code> matches column and timestamp.
+   */
+  static boolean getFullCheck(final KeyValue.KVComparator c,
+      final KeyValue target, final KeyValue candidate,
+      final Set<byte []> columns, final Pattern columnPattern) {
+    // Does column match?
+    if (!Store.matchingColumns(candidate, columns)) {
+      return false;
+    }
+    // if the column pattern is not null, we use it for column matching.
+    // we will skip the keys whose column doesn't match the pattern.
+    if (columnPattern != null) {
+      if (!(columnPattern.matcher(candidate.getColumnString()).matches())) {
+        return false;
+      }
+    }
+    if (c.compareTimestamps(target, candidate) > 0)  {
+      return false;
+    }
+    return true; 
   }
 
   /*
    * @param wantedVersions How many versions were asked for.
-   * @return wantedVersions or this families' MAX_VERSIONS.
+   * @return wantedVersions or this families' VERSIONS.
    */
   private int versionsToReturn(final int wantedVersions) {
     if (wantedVersions <= 0) {
       throw new IllegalArgumentException("Number of versions must be > 0");
     }
     // Make sure we do not return more than maximum versions for this store.
-    return wantedVersions > this.family.getMaxVersions()?
+    return wantedVersions > this.family.getMaxVersions() &&
+        wantedVersions != HConstants.ALL_VERSIONS?
       this.family.getMaxVersions(): wantedVersions;
   }
   
@@ -1132,7 +1144,8 @@
    * @return values for the specified versions
    * @throws IOException
    */
-  Cell[] get(final HStoreKey key, final int numVersions) throws IOException {
+  List<KeyValue> get(final KeyValue key, final int numVersions)
+  throws IOException {
     // This code below is very close to the body of the getKeys method.  Any 
     // changes in the flow below should also probably be done in getKeys.
     // TODO: Refactor so same code used.
@@ -1146,78 +1159,48 @@
     // than one version. This List of deletes should not be large since we
     // are only keeping rows and columns that match those set on the get and
     // which have delete values.  If memory usage becomes an issue, could
-    // redo as bloom filter.
-    Set<HStoreKey> deletes = new HashSet<HStoreKey>();
+    // redo as bloom filter.  Use sorted set because test for membership should
+    // be faster than calculating a hash.  Use a comparator that ignores ts.
+    NavigableSet<KeyValue> deletes =
+      new TreeSet<KeyValue>(this.comparatorIgnoringType);
+    List<KeyValue> keyvalues = new ArrayList<KeyValue>();
     this.lock.readLock().lock();
     try {
       // Check the memcache
-      List<Cell> results = this.memcache.get(key, versions, deletes, now);
-      // If we got sufficient versions from memcache, return. 
-      if (results.size() == versions) {
-        return results.toArray(new Cell[results.size()]);
+      if (this.memcache.get(key, versions, keyvalues, deletes, now)) {
+        return keyvalues;
       }
       Map<Long, StoreFile> m = this.storefiles.descendingMap();
-      byte [] keyBytes = key.getBytes();
-      for (Iterator<Map.Entry<Long, StoreFile>> i = m.entrySet().iterator();
-          i.hasNext() && !hasEnoughVersions(versions, results);) {
-        StoreFile f = i.next().getValue();
+      boolean hasEnough = false;
+      for (Map.Entry<Long, StoreFile> e: m.entrySet()) {
+        StoreFile f = e.getValue();
         HFileScanner scanner = f.getReader().getScanner();
-        if (!getClosest(scanner, keyBytes)) {
-          continue;
-        }
-        HStoreKey readkey = HStoreKey.create(scanner.getKey());
-        if (!readkey.matchesRowCol(key)) {
+        if (!getClosest(scanner, key)) {
+          // Move to next file.
           continue;
         }
-        if (get(readkey, scanner.getValue(), versions, results, deletes, now)) {
-          break;
-        }
-        while (scanner.next()) {
-          readkey = HStoreKey.create(scanner.getKey());
-          if (!readkey.matchesRowCol(key)) {
-            break;
-          }
-          if (get(readkey, scanner.getValue(), versions, results, deletes, now)) {
+        do {
+          KeyValue kv = scanner.getKeyValue();
+          // Make sure below matches what happens up in Memcache#get.
+          if (this.comparator.matchingRowColumn(kv, key)) {
+            if (doKeyValue(kv, versions, deletes, now, this.ttl, keyvalues, null)) {
+              hasEnough = true;
+              break;
+            }
+          } else {
+            // Row and column don't match. Must have gone past. Move to next file.
             break;
           }
+        } while (scanner.next());
+        if (hasEnough) {
+          break; // Break out of files loop.
         }
       }
-      return results.size() == 0 ?
-        null : results.toArray(new Cell[results.size()]);
+      return keyvalues.isEmpty()? null: keyvalues;
     } finally {
       this.lock.readLock().unlock();
     }
   }
-  
-  /*
-   * Look at one key/value.
-   * @param key
-   * @param value
-   * @param versions
-   * @param results
-   * @param deletes
-   * @param now
-   * @return True if we have enough versions.
-   */
-  private boolean get(final HStoreKey key, ByteBuffer value,
-      final int versions, final List<Cell> results,
-      final Set<HStoreKey> deletes, final long now) {
-    if (!HLogEdit.isDeleted(value)) {
-      if (notExpiredAndNotInDeletes(this.ttl, key, now, deletes)) {
-        results.add(new Cell(value, key.getTimestamp()));
-      }
-      // Perhaps only one version is wanted.  I could let this
-      // test happen later in the for loop test but it would cost
-      // the allocation of an ImmutableBytesWritable.
-      if (hasEnoughVersions(versions, results)) {
-        return true;
-      }
-    } else {
-      // Is this copy necessary?
-      deletes.add(new HStoreKey(key));
-    }
-    return false;
-  }
 
   /*
    * Small method to check if we are over the max number of versions
@@ -1227,87 +1210,109 @@
    * @param c
    * @return 
    */
-  private boolean hasEnoughVersions(final int versions, final List<Cell> c) {
-    return c.size() >= versions;
+  static boolean hasEnoughVersions(final int versions, final List<KeyValue> c) {
+    return versions > 0 && !c.isEmpty() && c.size() >= versions;
   }
 
-  /**
-   * Get <code>versions</code> of keys matching the origin key's
-   * row/column/timestamp and those of an older vintage.
-   * @param origin Where to start searching.
-   * @param versions How many versions to return. Pass
-   * {@link HConstants#ALL_VERSIONS} to retrieve all.
+  /*
+   * Used when doing getFulls.
+   * @param kv
+   * @param versions
+   * @param versionCounter
+   * @param columns
+   * @param deletes
    * @param now
-   * @param columnPattern regex pattern for column matching. if columnPattern
-   * is not null, we use column pattern to match columns. And the columnPattern
-   * only works when origin's column is null or its length is zero.
-   * @return Matching keys.
-   * @throws IOException
-   */
-  public List<HStoreKey> getKeys(final HStoreKey origin, final int versions,
-    final long now, final Pattern columnPattern)
-  throws IOException {
-    // This code below is very close to the body of the get method.  Any 
-    // changes in the flow below should also probably be done in get.
-    // TODO: Refactor so same code used.
-    Set<HStoreKey> deletes = new HashSet<HStoreKey>();
-    this.lock.readLock().lock();
-    try {
-      // Check the memcache
-      List<HStoreKey> keys =
-        this.memcache.getKeys(origin, versions, deletes, now, columnPattern);
-      // If we got sufficient versions from memcache, return.
-      if (keys.size() >= versions) {
-        return keys;
-      }
-      Map<Long, StoreFile> m = this.storefiles.descendingMap();
-      for (Iterator<Map.Entry<Long, StoreFile>> i = m.entrySet().iterator();
-          i.hasNext() && keys.size() < versions;) {
-        StoreFile f = i.next().getValue();
-        HFileScanner scanner = f.getReader().getScanner();
-        if (!getClosest(scanner, origin.getBytes())) {
-          continue;
+   * @param ttl
+   * @param keyvalues
+   * @param set
+   * @return True if enough versions.
+   */
+  static boolean doKeyValue(final KeyValue kv,
+      final int versions,
+      final Map<KeyValue, Counter> versionCounter,
+      final Set<byte []> columns,
+      final NavigableSet<KeyValue> deletes,
+      final long now, 
+      final long ttl,
+      final List<KeyValue> keyvalues,
+      final SortedSet<KeyValue> set) {
+    boolean hasEnough = false;
+    if (kv.isDeleteType()) {
+      if (!deletes.contains(kv)) {
+        deletes.add(kv);
+      }
+    } else if (!deletes.contains(kv)) {
+      // Skip expired cells
+      if (!isExpired(kv, ttl, now)) {
+        if (HRegion.okToAddResult(kv, versions, versionCounter)) {
+          HRegion.addResult(kv, versionCounter, keyvalues);
+          if (HRegion.hasEnoughVersions(versions, versionCounter, columns)) {
+            hasEnough = true;
+          }
         }
-        do {
-          HStoreKey readkey = HStoreKey.create(scanner.getKey());
-          // if the row and column matches, we might want this one.
-          if (rowMatches(origin, readkey)) {
-            // if the column pattern is not null, we use it for column matching.
-            // we will skip the keys whose column doesn't match the pattern.
-            if (columnPattern != null) {
-              if (!(columnPattern.
-                  matcher(Bytes.toString(readkey.getColumn())).matches())) {
-                continue;
-              }
-            }
-            // if the cell address matches, then we definitely want this key.
-            if (cellMatches(origin, readkey)) {
-              ByteBuffer readval = scanner.getValue();
-              // Store key if isn't deleted or superceded by memcache
-              if (!HLogEdit.isDeleted(readval)) {
-                if (notExpiredAndNotInDeletes(this.ttl, readkey, now, deletes)) {
-                  keys.add(readkey);
-                }
-                if (keys.size() >= versions) {
-                  break;
-                }
-              } else {
-                deletes.add(readkey);
-              }
-            } else {
-              // the cell doesn't match, but there might be more with different
-              // timestamps, so move to the next key
-              continue;
-            }
-          } else {
-            // the row doesn't match, so we've gone too far.
-            break;
+      } else {
+        // Remove the expired.
+        Store.expiredOrDeleted(set, kv);
+      }
+    }
+    return hasEnough;
+  }
+
+  /*
+   * Used when doing get.
+   * @param kv
+   * @param versions
+   * @param deletes
+   * @param now
+   * @param ttl
+   * @param keyvalues
+   * @param set
+   * @return True if enough versions.
+   */
+  static boolean doKeyValue(final KeyValue kv, final int versions,
+      final NavigableSet<KeyValue> deletes,
+      final long now,  final long ttl,
+      final List<KeyValue> keyvalues, final SortedSet<KeyValue> set) {
+    boolean hasEnough = false;
+    if (!kv.isDeleteType()) {
+      // Filter out expired results
+      if (notExpiredAndNotInDeletes(ttl, kv, now, deletes)) {
+        if (!keyvalues.contains(kv)) {
+          keyvalues.add(kv);
+          if (hasEnoughVersions(versions, keyvalues)) {
+            hasEnough = true;
           }
-        } while (scanner.next()); // advance to the next key
+        }
+      } else {
+        if (set != null) {
+          expiredOrDeleted(set, kv);
+        }
       }
-      return keys;
-    } finally {
-      this.lock.readLock().unlock();
+    } else {
+      // Cell holds a delete value.
+      deletes.add(kv);
+    }
+    return hasEnough;
+  }
+
+  /*
+   * Test that the <i>target</i> matches the <i>origin</i>. If the <i>origin</i>
+   * has an empty column, then it just tests row equivalence. Otherwise, it uses
+   * HStoreKey.matchesRowCol().
+   * @param c Comparator to use.
+   * @param origin Key we're testing against
+   * @param target Key we're testing
+   */
+  static boolean matchingRowColumn(final KeyValue.KVComparator c,
+      final KeyValue origin, final KeyValue target) {
+    return origin.isEmptyColumn()? c.matchingRows(target, origin):
+      c.matchingRowColumn(target, origin);
+  }
+
+  static void expiredOrDeleted(final Set<KeyValue> set, final KeyValue kv) {
+    boolean b = set.remove(kv);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(kv.toString() + " expired: " + b);
     }
   }
 
@@ -1316,19 +1321,21 @@
    * preceeds it. WARNING: Only use this method on a table where writes occur 
    * with stricly increasing timestamps. This method assumes this pattern of 
    * writes in order to make it reasonably performant.
-   * @param row
-   * @return Found row
+   * @param targetkey
+   * @return Found keyvalue
    * @throws IOException
    */
-  byte [] getRowKeyAtOrBefore(final byte [] row)
+  KeyValue getRowKeyAtOrBefore(final KeyValue targetkey)
   throws IOException{
-    // Map of HStoreKeys that are candidates for holding the row key that
+    // Map of keys that are candidates for holding the row key that
     // most closely matches what we're looking for. We'll have to update it as
     // deletes are found all over the place as we go along before finally
-    // reading the best key out of it at the end.
-    NavigableMap<HStoreKey, Long> candidateKeys =
-      new TreeMap<HStoreKey, Long>(this.comparator);
-    
+    // reading the best key out of it at the end.   Use a comparator that
+    // ignores key types.  Otherwise, we can't remove deleted items doing
+    // set.remove because of the differing type between insert and delete.
+    NavigableSet<KeyValue> candidates =
+      new TreeSet<KeyValue>(this.comparator.getComparatorIgnoringType());
+
     // Keep a list of deleted cell keys.  We need this because as we go through
     // the store files, the cell with the delete marker may be in one file and
     // the old non-delete cell value in a later store file. If we don't keep
@@ -1337,22 +1344,22 @@
     // This List of deletes should not be large since we are only keeping rows
     // and columns that match those set on the scanner and which have delete
     // values.  If memory usage becomes an issue, could redo as bloom filter.
-    Set<HStoreKey> deletes = new HashSet<HStoreKey>();
+    NavigableSet<KeyValue> deletes =
+      new TreeSet<KeyValue>(this.comparatorIgnoringType);
+    long now = System.currentTimeMillis();
     this.lock.readLock().lock();
     try {
       // First go to the memcache.  Pick up deletes and candidates.
-      this.memcache.getRowKeyAtOrBefore(row, candidateKeys, deletes);
+      this.memcache.getRowKeyAtOrBefore(targetkey, candidates, deletes, now);
       // Process each store file.  Run through from newest to oldest.
-      // This code below is very close to the body of the getKeys method.
       Map<Long, StoreFile> m = this.storefiles.descendingMap();
       for (Map.Entry<Long, StoreFile> e: m.entrySet()) {
         // Update the candidate keys from the current map file
-        rowAtOrBeforeFromStoreFile(e.getValue(), row, candidateKeys, deletes);
+        rowAtOrBeforeFromStoreFile(e.getValue(), targetkey, candidates,
+          deletes, now);
       }
       // Return the best key from candidateKeys
-      byte [] result =
-        candidateKeys.isEmpty()? null: candidateKeys.lastKey().getRow();
-      return result;
+      return candidates.isEmpty()? null: candidates.last();
     } finally {
       this.lock.readLock().unlock();
     }
@@ -1362,123 +1369,92 @@
    * Check an individual MapFile for the row at or before a given key 
    * and timestamp
    * @param f
-   * @param row
-   * @param candidateKeys
+   * @param targetkey
+   * @param candidates Pass a Set with a Comparator that
+   * ignores key Type so we can do Set.remove using a delete, i.e. a KeyValue
+   * with a different Type to the candidate key.
    * @throws IOException
    */
   private void rowAtOrBeforeFromStoreFile(final StoreFile f,
-    final byte [] row, final SortedMap<HStoreKey, Long> candidateKeys,
-    final Set<HStoreKey> deletes)
+    final KeyValue targetkey, final NavigableSet<KeyValue> candidates,
+    final NavigableSet<KeyValue> deletes, final long now)
   throws IOException {
-    HFileScanner scanner = f.getReader().getScanner();
-    // TODO: FIX THIS PROFLIGACY!!!
-    if (!scanner.seekBefore(new HStoreKey(row).getBytes())) {
-      return;
-    }
-    long now = System.currentTimeMillis();
-    HStoreKey startKey = HStoreKey.create(scanner.getKey());
     // if there aren't any candidate keys yet, we'll do some things different 
-    if (candidateKeys.isEmpty()) {
-      rowAtOrBeforeCandidate(startKey, f, row, candidateKeys, deletes, now);
+    if (candidates.isEmpty()) {
+      rowAtOrBeforeCandidate(f, targetkey, candidates, deletes, now);
     } else {
-      rowAtOrBeforeWithCandidates(startKey, f, row, candidateKeys, deletes, now);
+      rowAtOrBeforeWithCandidates(f, targetkey, candidates, deletes, now);
     }
   }
-  
-  /* Find a candidate for row that is at or before passed row in passed
-   * mapfile.
-   * @param startKey First key in the mapfile.
-   * @param map
-   * @param row
-   * @param candidateKeys
-   * @param now
-   * @throws IOException
-   */
-  private void rowAtOrBeforeCandidate(final HStoreKey startKey,
-    final StoreFile f, final byte[] row,
-    final SortedMap<HStoreKey, Long> candidateKeys,
-    final Set<HStoreKey> deletes, final long now) 
-  throws IOException {
-    // if the row we're looking for is past the end of this mapfile, set the
-    // search key to be the last key.  If its a deleted key, then we'll back
-    // up to the row before and return that.
-    HStoreKey finalKey = HStoreKey.create(f.getReader().getLastKey());
-    HStoreKey searchKey = null;
-    if (this.rawcomparator.compareRows(finalKey.getRow(), 0,
-          finalKey.getRow().length,
-        row, 0, row.length) < 0) {
-      searchKey = finalKey;
-    } else {
-      searchKey = new HStoreKey(row);
-      if (this.comparator.compare(searchKey, startKey) < 0) {
-        searchKey = startKey;
-      }
-    }
-    rowAtOrBeforeCandidate(f, searchKey, candidateKeys, deletes, now);
-  }
 
   /* 
    * @param ttlSetting
    * @param hsk
    * @param now
-   * @param deletes
+   * @param deletes A Set whose Comparator ignores Type.
    * @return True if key has not expired and is not in passed set of deletes.
    */
   static boolean notExpiredAndNotInDeletes(final long ttl,
-      final HStoreKey hsk, final long now, final Set<HStoreKey> deletes) {
-    return !isExpired(hsk, ttl, now) &&
-      (deletes == null || !deletes.contains(hsk));
+      final KeyValue key, final long now, final Set<KeyValue> deletes) {
+    return !isExpired(key, ttl, now) && (deletes == null || deletes.isEmpty() ||
+        !deletes.contains(key));
   }
-  
-  static boolean isExpired(final HStoreKey hsk, final long ttl,
+
+  static boolean isExpired(final KeyValue key, final long ttl,
       final long now) {
-    return ttl != HConstants.FOREVER && now > hsk.getTimestamp() + ttl;
+    return ttl != HConstants.FOREVER && now > key.getTimestamp() + ttl;
   }
 
-  /* Find a candidate for row that is at or before passed key, sk, in mapfile.
+  /* Find a candidate for row that is at or before passed key, searchkey, in hfile.
    * @param f
-   * @param sk Key to go search the mapfile with.
-   * @param candidateKeys
+   * @param targetkey Key to go search the hfile with.
+   * @param candidates
    * @param now
    * @throws IOException
    * @see {@link #rowAtOrBeforeCandidate(HStoreKey, org.apache.hadoop.io.MapFile.Reader, byte[], SortedMap, long)}
    */
   private void rowAtOrBeforeCandidate(final StoreFile f,
-    final HStoreKey sk, final SortedMap<HStoreKey, Long> candidateKeys,
-    final Set<HStoreKey> deletes, final long now)
+    final KeyValue targetkey, final NavigableSet<KeyValue> candidates,
+    final NavigableSet<KeyValue> deletes, final long now)
   throws IOException {
-    HStoreKey searchKey = sk;
-    HStoreKey readkey = null;
-    HStoreKey knownNoGoodKey = null;
+    KeyValue search = targetkey;
+    // If the row we're looking for is past the end of this mapfile, set the
+    // search key to be the last key.  If its a deleted key, then we'll back
+    // up to the row before and return that.
+    // TODO: Cache last key as KV over in the file.
+    byte [] lastkey = f.getReader().getLastKey();
+    KeyValue lastKeyValue =
+      KeyValue.createKeyValueFromKey(lastkey, 0, lastkey.length);
+    if (this.comparator.compareRows(lastKeyValue, targetkey) < 0) {
+      search = lastKeyValue;
+    }
+    KeyValue knownNoGoodKey = null;
     HFileScanner scanner = f.getReader().getScanner();
     for (boolean foundCandidate = false; !foundCandidate;) {
       // Seek to the exact row, or the one that would be immediately before it
-      int result = scanner.seekTo(searchKey.getBytes());
+      int result = scanner.seekTo(search.getBuffer(), search.getKeyOffset(),
+        search.getKeyLength());
       if (result < 0) {
         // Not in file.
         continue;
       }
-      HStoreKey deletedOrExpiredRow = null;
+      KeyValue deletedOrExpiredRow = null;
+      KeyValue kv = null;
       do {
-        readkey = HStoreKey.create(scanner.getKey());
-        ByteBuffer value = scanner.getValue();
-        // If we have an exact match on row, and it's not a delete, save this
-        // as a candidate key
-        if (HStoreKey.equalsTwoRowKeys(readkey.getRow(), searchKey.getRow())) {
-          if (!HLogEdit.isDeleted(value)) {
-            if (handleNonDelete(readkey, now, deletes, candidateKeys)) {
+        kv = scanner.getKeyValue();
+        if (this.comparator.compareRows(kv, search) <= 0) {
+          if (!kv.isDeleteType()) {
+            if (handleNonDelete(kv, now, deletes, candidates)) {
               foundCandidate = true;
               // NOTE! Continue.
               continue;
             }
           }
-          HStoreKey copy = addCopyToDeletes(readkey, deletes);
+          deletes.add(kv);
           if (deletedOrExpiredRow == null) {
-            deletedOrExpiredRow = copy;
+            deletedOrExpiredRow = kv;
           }
-        } else if (this.rawcomparator.compareRows(readkey.getRow(), 0,
-              readkey.getRow().length,
-            searchKey.getRow(), 0, searchKey.getRow().length) > 0) {
+        } else if (this.comparator.compareRows(kv, search) > 0) {
           // if the row key we just read is beyond the key we're searching for,
           // then we're done.
           break;
@@ -1486,30 +1462,32 @@
           // So, the row key doesn't match, but we haven't gone past the row
           // we're seeking yet, so this row is a candidate for closest
           // (assuming that it isn't a delete).
-          if (!HLogEdit.isDeleted(value)) {
-            if (handleNonDelete(readkey, now, deletes, candidateKeys)) {
+          if (!kv.isDeleteType()) {
+            if (handleNonDelete(kv, now, deletes, candidates)) {
               foundCandidate = true;
               // NOTE: Continue
               continue;
             }
           }
-          HStoreKey copy = addCopyToDeletes(readkey, deletes);
+          deletes.add(kv);
           if (deletedOrExpiredRow == null) {
-            deletedOrExpiredRow = copy;
+            deletedOrExpiredRow = kv;
           }
         }
       } while(scanner.next() && (knownNoGoodKey == null ||
-          this.comparator.compare(readkey, knownNoGoodKey) < 0));
+          this.comparator.compare(kv, knownNoGoodKey) < 0));
 
       // If we get here and have no candidates but we did find a deleted or
       // expired candidate, we need to look at the key before that
       if (!foundCandidate && deletedOrExpiredRow != null) {
         knownNoGoodKey = deletedOrExpiredRow;
-        if (!scanner.seekBefore(deletedOrExpiredRow.getBytes())) {
-          // Is this right?
+        if (!scanner.seekBefore(deletedOrExpiredRow.getBuffer(),
+            deletedOrExpiredRow.getKeyOffset(),
+            deletedOrExpiredRow.getKeyLength())) {
+          // Not in file -- what can I do now but break?
           break;
         }
-        searchKey = HStoreKey.create(scanner.getKey());
+        search = scanner.getKeyValue();
       } else {
         // No candidates and no deleted or expired candidates. Give up.
         break;
@@ -1520,55 +1498,40 @@
     // without going "past" the key we're searching for. we can just fall
     // through here.
   }
-  
-  /*
-   * @param key Key to copy and add to <code>deletes</code>
-   * @param deletes
-   * @return Instance of the copy added to <code>deletes</code>
-   */
-  private HStoreKey addCopyToDeletes(final HStoreKey key,
-      final Set<HStoreKey> deletes) {
-    HStoreKey copy = new HStoreKey(key);
-    deletes.add(copy);
-    return copy;
-  }
 
-  private void rowAtOrBeforeWithCandidates(final HStoreKey startKey,
-    final StoreFile f, final byte[] row,
-    final SortedMap<HStoreKey, Long> candidateKeys,
-    final Set<HStoreKey> deletes, final long now) 
+  private void rowAtOrBeforeWithCandidates(final StoreFile f,
+    final KeyValue targetkey,
+    final NavigableSet<KeyValue> candidates,
+    final NavigableSet<KeyValue> deletes, final long now) 
   throws IOException {
     // if there are already candidate keys, we need to start our search 
     // at the earliest possible key so that we can discover any possible
     // deletes for keys between the start and the search key.  Back up to start
     // of the row in case there are deletes for this candidate in this mapfile
     // BUT do not backup before the first key in the store file.
-    // TODO: FIX THIS PROFLIGATE OBJECT MAKING!!!
-    HStoreKey firstCandidateKey = candidateKeys.firstKey();
-    byte [] searchKey = null;
-    HStoreKey.StoreKeyComparator c =
-      (HStoreKey.StoreKeyComparator)f.getReader().getComparator();
-    if (c.compareRows(firstCandidateKey.getRow(), startKey.getRow()) < 0) {
-      searchKey = startKey.getBytes();
+    KeyValue firstCandidateKey = candidates.first();
+    KeyValue search = null;
+    if (this.comparator.compareRows(firstCandidateKey, targetkey) < 0) {
+      search = targetkey;
     } else {
-      searchKey = new HStoreKey(firstCandidateKey.getRow()).getBytes();
+      search = firstCandidateKey;
     }
 
     // Seek to the exact row, or the one that would be immediately before it
     HFileScanner scanner = f.getReader().getScanner();
-    int result = scanner.seekTo(searchKey);
+    int result = scanner.seekTo(search.getBuffer(), search.getKeyOffset(),
+      search.getKeyLength());
     if (result < 0) {
       // Key is before start of this file.  Return.
       return;
     }
     do {
-      HStoreKey k = HStoreKey.create(scanner.getKey());
-      ByteBuffer v = scanner.getValue();
+      KeyValue kv = scanner.getKeyValue();
       // if we have an exact match on row, and it's not a delete, save this
       // as a candidate key
-      if (HStoreKey.equalsTwoRowKeys(k.getRow(), row)) {
-        handleKey(k, v, now, deletes, candidateKeys);
-      } else if (this.rawcomparator.compareRows(k.getRow(), row) > 0 ) {
+      if (this.comparator.matchingRows(kv, targetkey)) {
+        handleKey(kv, now, deletes, candidates);
+      } else if (this.comparator.compareRows(kv, targetkey) > 0 ) {
         // if the row key we just read is beyond the key we're searching for,
         // then we're done.
         break;
@@ -1576,109 +1539,61 @@
         // So, the row key doesn't match, but we haven't gone past the row
         // we're seeking yet, so this row is a candidate for closest 
         // (assuming that it isn't a delete).
-        handleKey(k, v, now, deletes, candidateKeys);
+        handleKey(kv, now, deletes, candidates);
       }
     } while(scanner.next());
   }
 
   /*
+   * Used calculating keys at or just before a passed key.
    * @param readkey
    * @param now
-   * @param deletes
-   * @param candidateKeys
+   * @param deletes Set with Comparator that ignores key type.
+   * @param candidate Set with Comprator that ignores key type.
    */
-  private void handleKey(final HStoreKey readkey, ByteBuffer value,
-      final long now, final Set<HStoreKey> deletes,
-      final SortedMap<HStoreKey, Long> candidateKeys) {
-    if (!HLogEdit.isDeleted(value)) {
-      handleNonDelete(readkey, now, deletes, candidateKeys);
+  private void handleKey(final KeyValue readkey, final long now,
+      final NavigableSet<KeyValue> deletes,
+      final NavigableSet<KeyValue> candidates) {
+    if (!readkey.isDeleteType()) {
+      handleNonDelete(readkey, now, deletes, candidates);
     } else {
-      // Pass copy because readkey will change next time next is called.
-      handleDeleted(new HStoreKey(readkey), candidateKeys, deletes);
+      handleDeletes(readkey, candidates, deletes);
     }
   }
 
   /*
+   * Used calculating keys at or just before a passed key.
    * @param readkey
    * @param now
-   * @param deletes
-   * @param candidateKeys
+   * @param deletes Set with Comparator that ignores key type.
+   * @param candidates Set with Comparator that ignores key type.
    * @return True if we added a candidate.
    */
-  private boolean handleNonDelete(final HStoreKey readkey, final long now,
-      final Set<HStoreKey> deletes, final Map<HStoreKey, Long> candidateKeys) {
+  private boolean handleNonDelete(final KeyValue readkey, final long now,
+      final NavigableSet<KeyValue> deletes,
+      final NavigableSet<KeyValue> candidates) {
     if (notExpiredAndNotInDeletes(this.ttl, readkey, now, deletes)) {
-      candidateKeys.put(stripTimestamp(readkey),
-        Long.valueOf(readkey.getTimestamp()));
+      candidates.add(readkey);
       return true;
     }
     return false;
   }
 
-  /* Handle keys whose values hold deletes.
+  /**
+   * Handle keys whose values hold deletes.
    * Add to the set of deletes and then if the candidate keys contain any that
-   * might match by timestamp, then check for a match and remove it if it's too
-   * young to survive the delete 
-   * @param k Be careful; if key was gotten from a Mapfile, pass in a copy.
-   * Values gotten by 'nexting' out of Mapfiles will change in each invocation.
-   * @param candidateKeys
+   * might match, then check for a match and remove it.  Implies candidates
+   * is made with a Comparator that ignores key type.
+   * @param k
+   * @param candidates
    * @param deletes
+   * @return True if we removed <code>k</code> from <code>candidates</code>.
    */
-  static void handleDeleted(final HStoreKey k,
-      final SortedMap<HStoreKey, Long> candidateKeys,
-      final Set<HStoreKey> deletes) {
+  static boolean handleDeletes(final KeyValue k,
+      final NavigableSet<KeyValue> candidates,
+      final NavigableSet<KeyValue> deletes) {
     deletes.add(k);
-    HStoreKey strippedKey = stripTimestamp(k);
-    if (candidateKeys.containsKey(strippedKey)) {
-      long bestCandidateTs = 
-        candidateKeys.get(strippedKey).longValue();
-      if (bestCandidateTs <= k.getTimestamp()) {
-        candidateKeys.remove(strippedKey);
-      }
-    }
-  }
-
-  static HStoreKey stripTimestamp(HStoreKey key) {
-    return new HStoreKey(key.getRow(), key.getColumn());
-  }
-    
-  /*
-   * Test that the <i>target</i> matches the <i>origin</i> cell address. If the 
-   * <i>origin</i> has an empty column, then it's assumed to mean any column 
-   * matches and only match on row and timestamp. Otherwise, it compares the
-   * keys with HStoreKey.matchesRowCol().
-   * @param origin The key we're testing against
-   * @param target The key we're testing
-   */
-  private boolean cellMatches(HStoreKey origin, HStoreKey target){
-    // if the origin's column is empty, then we're matching any column
-    if (Bytes.equals(origin.getColumn(), HConstants.EMPTY_BYTE_ARRAY)) {
-      // if the row matches, then...
-      if (HStoreKey.equalsTwoRowKeys(target.getRow(), origin.getRow())) {
-        // check the timestamp
-        return target.getTimestamp() <= origin.getTimestamp();
-      }
-      return false;
-    }
-    // otherwise, we want to match on row and column
-    return target.matchesRowCol(origin);
-  }
-    
-  /*
-   * Test that the <i>target</i> matches the <i>origin</i>. If the <i>origin</i>
-   * has an empty column, then it just tests row equivalence. Otherwise, it uses
-   * HStoreKey.matchesRowCol().
-   * @param origin Key we're testing against
-   * @param target Key we're testing
-   */
-  private boolean rowMatches(final HStoreKey origin, final HStoreKey target){
-    // if the origin's column is empty, then we're matching any column
-    if (Bytes.equals(origin.getColumn(), HConstants.EMPTY_BYTE_ARRAY)) {
-      // if the row matches, then...
-      return HStoreKey.equalsTwoRowKeys(target.getRow(), origin.getRow());
-    }
-    // otherwise, we want to match on row and column
-    return target.matchesRowCol(origin);
+    return candidates.remove(k);
   }
 
   /**
@@ -1701,18 +1616,18 @@
       long maxSize = 0L;
       Long mapIndex = Long.valueOf(0L);
       for (Map.Entry<Long, StoreFile> e: storefiles.entrySet()) {
-        StoreFile curHSF = e.getValue();
+        StoreFile sf = e.getValue();
         if (splitable) {
-          splitable = !curHSF.isReference();
+          splitable = !sf.isReference();
           if (!splitable) {
             // RETURN IN MIDDLE OF FUNCTION!!! If not splitable, just return.
             if (LOG.isDebugEnabled()) {
-              LOG.debug(curHSF +  " is not splittable");
+              LOG.debug(sf +  " is not splittable");
             }
             return null;
           }
         }
-        long size = curHSF.getReader().length();
+        long size = sf.getReader().length();
         if (size > maxSize) {
           // This is the largest one so far
           maxSize = size;
@@ -1726,13 +1641,15 @@
       // the row we want to split on as midkey.
       byte [] midkey = r.midkey();
       if (midkey != null) {
-        HStoreKey mk = HStoreKey.create(midkey);
-        HStoreKey firstKey = HStoreKey.create(r.getFirstKey());
-        HStoreKey lastKey = HStoreKey.create(r.getLastKey());
+        KeyValue mk = KeyValue.createKeyValueFromKey(midkey, 0, midkey.length);
+        byte [] fk = r.getFirstKey();
+        KeyValue firstKey = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
+        byte [] lk = r.getLastKey();
+        KeyValue lastKey = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
         // if the midkey is the same as the first and last keys, then we cannot
         // (ever) split this region. 
-        if (HStoreKey.equalsTwoRowKeys(mk.getRow(), firstKey.getRow()) && 
-            HStoreKey.equalsTwoRowKeys( mk.getRow(), lastKey.getRow())) {
+        if (this.comparator.compareRows(mk, firstKey) == 0 && 
+            this.comparator.compareRows(mk, lastKey) == 0) {
           if (LOG.isDebugEnabled()) {
             LOG.debug("cannot split because midkey is the same as first or " +
               "last row");
@@ -1761,7 +1678,8 @@
   /**
    * Return a scanner for both the memcache and the HStore files
    */
-  protected InternalScanner getScanner(long timestamp, byte [][] targetCols,
+  protected InternalScanner getScanner(long timestamp,
+      final NavigableSet<byte []> targetCols,
       byte [] firstRow, RowFilterInterface filter)
   throws IOException {
     lock.readLock().lock();
@@ -1798,22 +1716,23 @@
 
   /*
    * Datastructure that holds size and row to split a file around.
+   * TODO: Take a KeyValue rather than row.
    */
   static class StoreSize {
     private final long size;
-    private final byte[] key;
-    StoreSize(long size, byte[] key) {
+    private final byte [] row;
+
+    StoreSize(long size, byte [] row) {
       this.size = size;
-      this.key = new byte[key.length];
-      System.arraycopy(key, 0, this.key, 0, key.length);
+      this.row = row;
     }
     /* @return the size */
     long getSize() {
       return size;
     }
-    /* @return the key */
-    byte[] getSplitRow() {
-      return key;
+
+    byte [] getSplitRow() {
+      return this.row;
     }
   }
 
@@ -1825,15 +1744,16 @@
    * Convenience method that implements the old MapFile.getClosest on top of
    * HFile Scanners.  getClosest used seek to the asked-for key or just after
    * (HFile seeks to the key or just before).
-   * @param s
-   * @param b
+   * @param s Scanner to use
+   * @param kv Key to find.
    * @return True if we were able to seek the scanner to <code>b</code> or to
    * the key just after.
    * @throws IOException 
    */
-  static boolean getClosest(final HFileScanner s, final byte [] b)
+  static boolean getClosest(final HFileScanner s, final KeyValue kv)
   throws IOException {
-    int result = s.seekTo(b);
+    // Pass offsets to key content of a KeyValue; thats whats in the hfile index.
+    int result = s.seekTo(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength());
     if (result < 0) {
       // Not in file.  Will the first key do?
       if (!s.seekTo()) {
@@ -1849,4 +1769,25 @@
     }
     return true;
   }
+
+  /**
+   * @param kv
+   * @param columns Can be null
+   * @return True if column matches.
+   */
+  static boolean matchingColumns(final KeyValue kv, final Set<byte []> columns) {
+    if (columns == null) {
+      return true;
+    }
+    // Only instantiate columns if lots of columns to test.
+    if (columns.size() > 100) {
+      return columns.contains(kv.getColumn());
+    }
+    for (byte [] column: columns) {
+      if (kv.matchingColumn(column)) {
+        return true;
+      }
+    }
+    return false;
+  }
 }

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=764289&r1=764288&r2=764289&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Sun Apr 12 10:39:55 2009
@@ -32,7 +32,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.io.HalfHFileReader;
 import org.apache.hadoop.hbase.io.Reference;
 import org.apache.hadoop.hbase.io.hfile.BlockCache;
@@ -254,29 +254,17 @@
 
     @Override
     protected String toStringFirstKey() {
-      String result = "";
-      try {
-        result = HStoreKey.create(getFirstKey()).toString();
-      } catch (IOException e) {
-        LOG.warn("Failed toString first key", e);
-      }
-      return result;
+      return KeyValue.keyToString(getFirstKey());
     }
 
     @Override
     protected String toStringLastKey() {
-      String result = "";
-      try {
-        result = HStoreKey.create(getLastKey()).toString();
-      } catch (IOException e) {
-        LOG.warn("Failed toString last key", e);
-      }
-      return result;
+      return KeyValue.keyToString(getLastKey());
     }
   }
 
   /**
-   * Override to add some customization on HalfHFileReader
+   * Override to add some customization on HalfHFileReader.
    */
   static class HalfStoreFileReader extends HalfHFileReader {
     public HalfStoreFileReader(FileSystem fs, Path p, BlockCache c, Reference r)
@@ -291,24 +279,12 @@
 
     @Override
     protected String toStringFirstKey() {
-      String result = "";
-      try {
-        result = HStoreKey.create(getFirstKey()).toString();
-      } catch (IOException e) {
-        LOG.warn("Failed toString first key", e);
-      }
-      return result;
+      return KeyValue.keyToString(getFirstKey());
     }
 
     @Override
     protected String toStringLastKey() {
-      String result = "";
-      try {
-        result = HStoreKey.create(getLastKey()).toString();
-      } catch (IOException e) {
-        LOG.warn("Failed toString last key", e);
-      }
-      return result;
+      return KeyValue.keyToString(getLastKey());
     }
   }
 
@@ -398,7 +374,7 @@
    */
   public static HFile.Writer getWriter(final FileSystem fs, final Path dir,
     final int blocksize, final Compression.Algorithm algorithm,
-    final HStoreKey.StoreKeyComparator c, final boolean bloomfilter)
+    final KeyValue.KeyComparator c, final boolean bloomfilter)
   throws IOException {
     if (!fs.exists(dir)) {
       fs.mkdirs(dir);
@@ -406,7 +382,7 @@
     Path path = getUniqueFile(fs, dir);
     return new HFile.Writer(fs, path, blocksize,
       algorithm == null? HFile.DEFAULT_COMPRESSION_ALGORITHM: algorithm,
-      c == null? new HStoreKey.StoreKeyComparator(): c, bloomfilter);
+      c == null? KeyValue.KEY_COMPARATOR: c, bloomfilter);
   }
 
   /**
@@ -501,7 +477,7 @@
     final StoreFile f, final byte [] splitRow, final Reference.Range range)
   throws IOException {
     // A reference to the bottom half of the hsf store file.
-    Reference r = new Reference(new HStoreKey(splitRow).getBytes(), range);
+    Reference r = new Reference(splitRow, range);
     // Add the referred-to regions name as a dot separated suffix. 
     // See REF_NAME_PARSER regex above.  The referred-to regions name is
     // up in the path of the passed in <code>f</code> -- parentdir is family,

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java?rev=764289&r1=764288&r2=764289&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java Sun Apr 12 10:39:55 2009
@@ -21,18 +21,15 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.SortedMap;
+import java.util.NavigableSet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HStoreKey;
-import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
-import org.apache.hadoop.hbase.util.Bytes;
 
 /**
  * A scanner that iterates through HStore files
@@ -40,9 +37,7 @@
 class StoreFileScanner extends HAbstractScanner
 implements ChangedReadersObserver {
     // Keys retrieved from the sources
-  private volatile HStoreKey keys[];
-  // Values that correspond to those keys
-  private ByteBuffer [] vals;
+  private volatile KeyValue keys[];
   
   // Readers we go against.
   private volatile HFileScanner [] scanners;
@@ -52,18 +47,21 @@
   
   // Used around replacement of Readers if they change while we're scanning.
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-  
+
+  private final long now = System.currentTimeMillis();
+
   /**
    * @param store
    * @param timestamp
-   * @param targetCols
+   * @param columns
    * @param firstRow
+   * @param deletes Set of running deletes
    * @throws IOException
    */
   public StoreFileScanner(final Store store, final long timestamp,
-    final byte [][] targetCols, final byte [] firstRow)
+    final NavigableSet<byte []> columns, final byte [] firstRow)
   throws IOException {
-    super(timestamp, targetCols);
+    super(timestamp, columns);
     this.store = store;
     this.store.addChangedReaderObserver(this);
     try {
@@ -75,7 +73,7 @@
       throw e;
     }
   }
-  
+
   /*
    * Go open new scanners and cue them at <code>firstRow</code>.
    * Closes existing Readers if any.
@@ -90,12 +88,13 @@
        s.add(f.getReader().getScanner());
     }
     this.scanners = s.toArray(new HFileScanner [] {});
-    this.keys = new HStoreKey[this.scanners.length];
-    this.vals = new ByteBuffer[this.scanners.length];
+    this.keys = new KeyValue[this.scanners.length];
     // Advance the readers to the first pos.
+    KeyValue firstKey = (firstRow != null && firstRow.length > 0)?
+      new KeyValue(firstRow, HConstants.LATEST_TIMESTAMP): null;
     for (int i = 0; i < this.scanners.length; i++) {
-      if (firstRow != null && firstRow.length != 0) {
-        if (findFirstRow(i, firstRow)) {
+      if (firstKey != null) {
+        if (seekTo(i, firstKey)) {
           continue;
         }
       }
@@ -118,7 +117,7 @@
    * @throws IOException
    */
   boolean columnMatch(int i) throws IOException {
-    return columnMatch(keys[i].getColumn());
+    return columnMatch(keys[i]);
   }
 
   /**
@@ -132,7 +131,7 @@
    * @see org.apache.hadoop.hbase.regionserver.InternalScanner#next(org.apache.hadoop.hbase.HStoreKey, java.util.SortedMap)
    */
   @Override
-  public boolean next(HStoreKey key, SortedMap<byte [], Cell> results)
+  public boolean next(List<KeyValue> results)
   throws IOException {
     if (this.scannerClosed) {
       return false;
@@ -140,84 +139,63 @@
     this.lock.readLock().lock();
     try {
       // Find the next viable row label (and timestamp).
-      ViableRow viableRow = getNextViableRow();
+      KeyValue viable = getNextViableRow();
+      if (viable == null) {
+        return false;
+      }
 
       // Grab all the values that match this row/timestamp
-      boolean insertedItem = false;
-      if (viableRow.getRow() != null) {
-        key.setRow(viableRow.getRow());
-        key.setVersion(viableRow.getTimestamp());
-        for (int i = 0; i < keys.length; i++) {
-          // Fetch the data
-          while ((keys[i] != null) &&
-            (this.store.rawcomparator.compareRows(this.keys[i].getRow(),
-                viableRow.getRow()) == 0)) {
-            // If we are doing a wild card match or there are multiple matchers
-            // per column, we need to scan all the older versions of this row
-            // to pick up the rest of the family members
-            if(!isWildcardScanner()
-                && !isMultipleMatchScanner()
-                && (keys[i].getTimestamp() != viableRow.getTimestamp())) {
-              break;
-            }
-            if(columnMatch(i)) {
-              // We only want the first result for any specific family member
-              if(!results.containsKey(keys[i].getColumn())) {
-                results.put(keys[i].getColumn(), 
-                    new Cell(vals[i], keys[i].getTimestamp()));
-                insertedItem = true;
-              }
-            }
-
-            if (!getNext(i)) {
-              closeSubScanner(i);
+      boolean addedItem = false;
+      for (int i = 0; i < keys.length; i++) {
+        // Fetch the data
+        while ((keys[i] != null) &&
+            (this.store.comparator.compareRows(this.keys[i], viable) == 0)) {
+          // If we are doing a wild card match or there are multiple matchers
+          // per column, we need to scan all the older versions of this row
+          // to pick up the rest of the family members
+          if(!isWildcardScanner()
+              && !isMultipleMatchScanner()
+              && (keys[i].getTimestamp() != viable.getTimestamp())) {
+            break;
+          }
+          if (columnMatch(i)) {
+            // We only want the first result for any specific family member
+            // TODO: Do we have to keep a running list of column entries in
+            // the results across all of the StoreScanner?  Like we do
+            // doing getFull?
+            if (!results.contains(keys[i])) {
+              results.add(keys[i]);
+              addedItem = true;
             }
           }
-          // Advance the current scanner beyond the chosen row, to
-          // a valid timestamp, so we're ready next time.
-          while ((keys[i] != null) &&
-            ((this.store.rawcomparator.compareRows(this.keys[i].getRow(),
-                viableRow.getRow()) <= 0) ||
-              (keys[i].getTimestamp() > this.timestamp) ||
-              (! columnMatch(i)))) {
-            getNext(i);
+
+          if (!getNext(i)) {
+            closeSubScanner(i);
           }
         }
+        // Advance the current scanner beyond the chosen row, to
+        // a valid timestamp, so we're ready next time.
+        while ((keys[i] != null) &&
+            ((this.store.comparator.compareRows(this.keys[i], viable) <= 0) ||
+                (keys[i].getTimestamp() > this.timestamp) ||
+                !columnMatch(i))) {
+          getNext(i);
+        }
       }
-      return insertedItem;
+      return addedItem;
     } finally {
       this.lock.readLock().unlock();
     }
   }
 
-  // Data stucture to hold next, viable row (and timestamp).
-  static class ViableRow {
-    private final byte [] row;
-    private final long ts;
-
-    ViableRow(final byte [] r, final long t) {
-      this.row = r;
-      this.ts = t;
-    }
-
-    byte [] getRow() {
-      return this.row;
-    }
-
-    long getTimestamp() {
-      return this.ts;
-    }
-  }
-
   /*
    * @return An instance of <code>ViableRow</code>
    * @throws IOException
    */
-  private ViableRow getNextViableRow() throws IOException {
+  private KeyValue getNextViableRow() throws IOException {
     // Find the next viable row label (and timestamp).
-    byte [] viableRow = null;
+    KeyValue viable = null;
     long viableTimestamp = -1;
-    long now = System.currentTimeMillis();
     long ttl = store.ttl;
     for (int i = 0; i < keys.length; i++) {
       // The first key that we find that matches may have a timestamp greater
@@ -235,15 +213,12 @@
           // If we get here and keys[i] is not null, we already know that the
           // column matches and the timestamp of the row is less than or equal
           // to this.timestamp, so we do not need to test that here
-          && ((viableRow == null) ||
-            (this.store.rawcomparator.compareRows(this.keys[i].getRow(),
-              viableRow) < 0) ||
-            ((this.store.rawcomparator.compareRows(this.keys[i].getRow(),
-                viableRow) == 0) &&
+          && ((viable == null) ||
+            (this.store.comparator.compareRows(this.keys[i], viable) < 0) ||
+            ((this.store.comparator.compareRows(this.keys[i], viable) == 0) &&
               (keys[i].getTimestamp() > viableTimestamp)))) {
         if (ttl == HConstants.FOREVER || now < keys[i].getTimestamp() + ttl) {
-          viableRow = keys[i].getRow();
-          viableTimestamp = keys[i].getTimestamp();
+          viable = keys[i];
         } else {
           if (LOG.isDebugEnabled()) {
             LOG.debug("getNextViableRow :" + keys[i] + ": expired, skipped");
@@ -251,7 +226,7 @@
         }
       }
     }
-    return new ViableRow(viableRow, viableTimestamp);
+    return viable;
   }
 
   /*
@@ -260,30 +235,25 @@
    *
    * @param i which iterator to advance
    * @param firstRow seek to this row
-   * @return true if this is the first row or if the row was not found
+   * @return true if we found the first row and so the scanner is properly
+   * primed or true if the row was not found and this scanner is exhausted.
    */
-  private boolean findFirstRow(int i, final byte [] firstRow) throws IOException {
-    if (firstRow == null || firstRow.length <= 0) {
+  private boolean seekTo(int i, final KeyValue firstKey)
+  throws IOException {
+    if (firstKey == null) {
       if (!this.scanners[i].seekTo()) {
         closeSubScanner(i);
         return true;
       }
     } else {
-      if (!Store.getClosest(this.scanners[i], HStoreKey.getBytes(firstRow))) {
+      // TODO: sort columns and pass in column as part of key so we get closer.
+      if (!Store.getClosest(this.scanners[i], firstKey)) {
         closeSubScanner(i);
         return true;
       }
     }
-    this.keys[i] = HStoreKey.create(this.scanners[i].getKey());
-    this.vals[i] = this.scanners[i].getValue();
-    long now = System.currentTimeMillis();
-    long ttl = store.ttl;
-    if (ttl != HConstants.FOREVER && now >= this.keys[i].getTimestamp() + ttl) {
-      // Didn't find it. Close the scanner and return TRUE
-      closeSubScanner(i);
-      return true;
-    }
-    return columnMatch(i);
+    this.keys[i] = this.scanners[i].getKeyValue();
+    return isGoodKey(this.keys[i]);
   }
 
   /**
@@ -294,34 +264,33 @@
    */
   private boolean getNext(int i) throws IOException {
     boolean result = false;
-    long now = System.currentTimeMillis();
-    long ttl = store.ttl;
     while (true) {
       if ((this.scanners[i].isSeeked() && !this.scanners[i].next()) ||
           (!this.scanners[i].isSeeked() && !this.scanners[i].seekTo())) {
         closeSubScanner(i);
         break;
       }
-      this.keys[i] = HStoreKey.create(this.scanners[i].getKey());
-      if (keys[i].getTimestamp() <= this.timestamp) {
-        if (ttl == HConstants.FOREVER || now < keys[i].getTimestamp() + ttl) {
-          vals[i] = this.scanners[i].getValue();
+      this.keys[i] = this.scanners[i].getKeyValue();
+      if (isGoodKey(this.keys[i])) {
           result = true;
           break;
-        }
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("getNext: " + keys[i] + ": expired, skipped");
-        }
       }
     }
     return result;
   }
 
+  /*
+   * @param kv
+   * @return True if good key candidate.
+   */
+  private boolean isGoodKey(final KeyValue kv) {
+    return !Store.isExpired(kv, this.store.ttl, this.now);
+  }
+
   /** Close down the indicated reader. */
   private void closeSubScanner(int i) {
     this.scanners[i] = null;
     this.keys[i] = null;
-    this.vals[i] = null;
   }
 
   /** Shut it down! */
@@ -346,11 +315,10 @@
       // The keys are currently lined up at the next row to fetch.  Pass in
       // the current row as 'first' row and readers will be opened and cue'd
       // up so future call to next will start here.
-      ViableRow viableRow = getNextViableRow();
-      openScanner(viableRow.getRow());
+      KeyValue viable = getNextViableRow();
+      openScanner(viable.getRow());
       LOG.debug("Replaced Scanner Readers at row " +
-        (viableRow == null || viableRow.getRow() == null? "null":
-          Bytes.toString(viableRow.getRow())));
+        viable.getRow().toString());
     } finally {
       this.lock.writeLock().unlock();
     }

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=764289&r1=764288&r2=764289&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Sun Apr 12 10:39:55 2009
@@ -21,20 +21,18 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
-import java.util.HashSet;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
+import java.util.NavigableSet;
+import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.filter.RowFilterInterface;
-import org.apache.hadoop.hbase.io.Cell;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
@@ -43,15 +41,14 @@
 class StoreScanner implements InternalScanner,  ChangedReadersObserver {
   static final Log LOG = LogFactory.getLog(StoreScanner.class);
 
-  private InternalScanner[] scanners;
-  private TreeMap<byte [], Cell>[] resultSets;
-  private HStoreKey[] keys;
+  private InternalScanner [] scanners;
+  private List<KeyValue> [] resultSets;
   private boolean wildcardMatch = false;
   private boolean multipleMatchers = false;
   private RowFilterInterface dataFilter;
   private Store store;
   private final long timestamp;
-  private final byte [][] targetCols;
+  private final NavigableSet<byte []> columns;
   
   // Indices for memcache scanner and hstorefile scanner.
   private static final int MEMS_INDEX = 0;
@@ -62,11 +59,11 @@
 
   // Used to indicate that the scanner has closed (see HBASE-1107)
   private final AtomicBoolean closing = new AtomicBoolean(false);
-  
+
   /** Create an Scanner with a handle on the memcache and HStore files. */
   @SuppressWarnings("unchecked")
-  StoreScanner(Store store, byte [][] targetCols, byte [] firstRow,
-    long timestamp, RowFilterInterface filter) 
+  StoreScanner(Store store, final NavigableSet<byte []> targetCols,
+    byte [] firstRow, long timestamp, RowFilterInterface filter) 
   throws IOException {
     this.store = store;
     this.dataFilter = filter;
@@ -74,12 +71,11 @@
       dataFilter.reset();
     }
     this.scanners = new InternalScanner[2];
-    this.resultSets = new TreeMap[scanners.length];
-    this.keys = new HStoreKey[scanners.length];
+    this.resultSets = new List[scanners.length];
     // Save these args in case we need them later handling change in readers
     // See updateReaders below.
     this.timestamp = timestamp;
-    this.targetCols = targetCols;
+    this.columns = targetCols;
     try {
       scanners[MEMS_INDEX] =
         store.memcache.getScanner(timestamp, targetCols, firstRow);
@@ -98,7 +94,6 @@
     for (int i = MEMS_INDEX; i < scanners.length; i++) {
       setupScanner(i);
     }
-    
     this.store.addChangedReaderObserver(this);
   }
   
@@ -120,10 +115,8 @@
    * @throws IOException
    */
   private void setupScanner(final int i) throws IOException {
-    this.keys[i] = new HStoreKey();
-    this.resultSets[i] = new TreeMap<byte [], Cell>(Bytes.BYTES_COMPARATOR);
-    if (this.scanners[i] != null && !this.scanners[i].next(this.keys[i],
-        this.resultSets[i])) {
+    this.resultSets[i] = new ArrayList<KeyValue>();
+    if (this.scanners[i] != null && !this.scanners[i].next(this.resultSets[i])) {
       closeScanner(i);
     }
   }
@@ -138,7 +131,7 @@
     return this.multipleMatchers;
   }
 
-  public boolean next(HStoreKey key, SortedMap<byte [], Cell> results)
+  public boolean next(List<KeyValue> results)
   throws IOException {
     this.lock.readLock().lock();
     try {
@@ -148,100 +141,82 @@
     boolean moreToFollow = true;
     while (filtered && moreToFollow) {
       // Find the lowest-possible key.
-      byte [] chosenRow = null;
+      KeyValue chosen = null;
       long chosenTimestamp = -1;
-      for (int i = 0; i < this.keys.length; i++) {
+      for (int i = 0; i < this.scanners.length; i++) {
+        KeyValue kv = this.resultSets[i] == null || this.resultSets[i].isEmpty()?
+          null: this.resultSets[i].get(0);
+        if (kv == null) {
+          continue;
+        }
         if (scanners[i] != null &&
-            (chosenRow == null ||
-            (this.store.rawcomparator.compareRows(this.keys[i].getRow(),
-              chosenRow) < 0) ||
-            ((this.store.rawcomparator.compareRows(this.keys[i].getRow(),
-              chosenRow) == 0) &&
-            (keys[i].getTimestamp() > chosenTimestamp)))) {
-          chosenRow = keys[i].getRow();
-          chosenTimestamp = keys[i].getTimestamp();
+            (chosen == null ||
+              (this.store.comparator.compareRows(kv, chosen) < 0) ||
+              ((this.store.comparator.compareRows(kv, chosen) == 0) &&
+              (kv.getTimestamp() > chosenTimestamp)))) {
+          chosen = kv;
+          chosenTimestamp = chosen.getTimestamp();
         }
       }
-      
+
       // Filter whole row by row key?
-      filtered = dataFilter != null? dataFilter.filterRowKey(chosenRow) : false;
+      filtered = dataFilter == null || chosen == null? false:
+        dataFilter.filterRowKey(chosen.getBuffer(), chosen.getRowOffset(),
+          chosen.getRowLength());
 
-      // Store the key and results for each sub-scanner. Merge them as
-      // appropriate.
+      // Store results for each sub-scanner.
       if (chosenTimestamp >= 0 && !filtered) {
-        // Here we are setting the passed in key with current row+timestamp
-        key.setRow(chosenRow);
-        key.setVersion(chosenTimestamp);
-        key.setColumn(HConstants.EMPTY_BYTE_ARRAY);
-        // Keep list of deleted cell keys within this row.  We need this
-        // because as we go through scanners, the delete record may be in an
-        // early scanner and then the same record with a non-delete, non-null
-        // value in a later. Without history of what we've seen, we'll return
-        // deleted values. This List should not ever grow too large since we
-        // are only keeping rows and columns that match those set on the
-        // scanner and which have delete values.  If memory usage becomes a
-        // problem, could redo as bloom filter.
-        Set<HStoreKey> deletes = new HashSet<HStoreKey>();
+        NavigableSet<KeyValue> deletes =
+          new TreeSet<KeyValue>(this.store.comparatorIgnoringType);
         for (int i = 0; i < scanners.length && !filtered; i++) {
-          while ((scanners[i] != null && !filtered && moreToFollow) &&
-            (this.store.rawcomparator.compareRows(this.keys[i].getRow(),
-              chosenRow) == 0)) {
-            // If we are doing a wild card match or there are multiple
-            // matchers per column, we need to scan all the older versions of 
-            // this row to pick up the rest of the family members
-            if (!wildcardMatch
-                && !multipleMatchers
-                && (keys[i].getTimestamp() != chosenTimestamp)) {
-              break;
+          if ((scanners[i] != null && !filtered && moreToFollow &&
+              this.resultSets[i] != null && !this.resultSets[i].isEmpty())) {
+            // Test this resultset is for the 'chosen' row.
+            KeyValue firstkv = resultSets[i].get(0);
+            if (!this.store.comparator.matchingRows(firstkv, chosen)) {
+              continue;
             }
-
-            // NOTE: We used to do results.putAll(resultSets[i]);
-            // but this had the effect of overwriting newer
-            // values with older ones. So now we only insert
-            // a result if the map does not contain the key.
-            HStoreKey hsk = new HStoreKey(key.getRow(),
-              HConstants.EMPTY_BYTE_ARRAY,
-              key.getTimestamp());
-            for (Map.Entry<byte [], Cell> e : resultSets[i].entrySet()) {
-              hsk.setColumn(e.getKey());
-              if (HLogEdit.isDeleted(e.getValue().getValue())) {
-                // Only first key encountered is added; deletes is a Set.
-                deletes.add(new HStoreKey(hsk));
-              } else if ((deletes.size() == 0 || !deletes.contains(hsk)) &&
-                  !filtered &&
-                  moreToFollow &&
-                  !results.containsKey(e.getKey())) {
-                if (dataFilter != null) {
+            // Its for the 'chosen' row, work it.
+            for (KeyValue kv: resultSets[i]) {
+              if (kv.isDeleteType()) {
+                deletes.add(kv);
+              } else if ((deletes.isEmpty() || !deletes.contains(kv)) &&
+                  !filtered && moreToFollow && !results.contains(kv)) {
+                if (this.dataFilter != null) {
                   // Filter whole row by column data?
-                  filtered = dataFilter.filterColumn(chosenRow, e.getKey(),
-                      e.getValue().getValue());
+                  int rowlength = kv.getRowLength();
+                  int columnoffset = kv.getColumnOffset(rowlength);
+                  filtered = dataFilter.filterColumn(kv.getBuffer(),
+                      kv.getRowOffset(), rowlength,
+                    kv.getBuffer(), columnoffset, kv.getColumnLength(columnoffset),
+                    kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
                   if (filtered) {
                     results.clear();
                     break;
                   }
                 }
-                results.put(e.getKey(), e.getValue());
+                results.add(kv);
+                /* REMOVING BECAUSE COULD BE BUNCH OF DELETES IN RESULTS
+                   AND WE WANT TO INCLUDE THEM -- below short-circuit is
+                   probably not wanted.
+                // If we are doing a wild card match or there are multiple
+                // matchers per column, we need to scan all the older versions of 
+                // this row to pick up the rest of the family members
+                if (!wildcardMatch && !multipleMatchers &&
+                    (kv.getTimestamp() != chosenTimestamp)) {
+                  break;
+                }
+                */
               }
             }
+            // Move on to next row.
             resultSets[i].clear();
-            if (!scanners[i].next(keys[i], resultSets[i])) {
+            if (!scanners[i].next(resultSets[i])) {
               closeScanner(i);
             }
           }
         }
       }
-      for (int i = 0; i < scanners.length; i++) {
-        // If the current scanner is non-null AND has a lower-or-equal
-        // row label, then its timestamp is bad. We need to advance it.
-        while ((scanners[i] != null) &&
-            (this.store.rawcomparator.compareRows(this.keys[i].getRow(),
-              chosenRow) <= 0)) {
-          resultSets[i].clear();
-          if (!scanners[i].next(keys[i], resultSets[i])) {
-            closeScanner(i);
-          }
-        }
-      }
 
       moreToFollow = chosenTimestamp >= 0;
       if (dataFilter != null) {
@@ -249,8 +224,8 @@
           moreToFollow = false;
         }
       }
-      
-      if (results.size() <= 0 && !filtered) {
+
+      if (results.isEmpty() && !filtered) {
         // There were no results found for this row.  Marked it as 
         // 'filtered'-out otherwise we will not move on to the next row.
         filtered = true;
@@ -258,7 +233,7 @@
     }
     
     // If we got no results, then there is no more to follow.
-    if (results == null || results.size() <= 0) {
+    if (results == null || results.isEmpty()) {
       moreToFollow = false;
     }
     
@@ -276,18 +251,18 @@
       this.lock.readLock().unlock();
     }
   }
-  
+
   /** Shut down a single scanner */
   void closeScanner(int i) {
     try {
       try {
         scanners[i].close();
       } catch (IOException e) {
-        LOG.warn(Bytes.toString(store.storeName) + " failed closing scanner " + i, e);
+        LOG.warn(Bytes.toString(store.storeName) + " failed closing scanner " +
+          i, e);
       }
     } finally {
       scanners[i] = null;
-      keys[i] = null;
       resultSets[i] = null;
     }
   }
@@ -321,8 +296,9 @@
         try {
           // I think its safe getting key from mem at this stage -- it shouldn't have
           // been flushed yet
+          // TODO: MAKE SURE WE UPDATE FROM TRUNNK.
           this.scanners[HSFS_INDEX] = new StoreFileScanner(this.store,
-              this.timestamp, this. targetCols, this.keys[MEMS_INDEX].getRow());
+              this.timestamp, this. columns, this.resultSets[MEMS_INDEX].get(0).getRow());
           checkScannerFlags(HSFS_INDEX);
           setupScanner(HSFS_INDEX);
           LOG.debug("Added a StoreFileScanner to outstanding HStoreScanner");