You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2009/08/17 19:34:20 UTC

svn commit: r805063 - in /hadoop/hbase/trunk: ./ bin/ src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/client/ src/java/org/apache/hadoop/hbase/regionserver/ src/java/org/apache/hadoop/hbase/util/ src/test/org/apache/hadoop/hbase/ src...

Author: stack
Date: Mon Aug 17 17:34:19 2009
New Revision: 805063

URL: http://svn.apache.org/viewvc?rev=805063&view=rev
Log:
HBASE-1761 getclosest doesn't understand delete family; manifests as 'HRegionInfo was null or empty in .META' A.K.A the BS problem

Added:
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestGetClosestAtOrBefore.java
Modified:
    hadoop/hbase/trunk/CHANGES.txt
    hadoop/hbase/trunk/bin/HBase.rb
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/KeyValue.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/Put.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/Result.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/GetDeleteTracker.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/Bytes.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseTestCase.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestCompaction.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java

Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=805063&r1=805062&r2=805063&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Mon Aug 17 17:34:19 2009
@@ -335,6 +335,8 @@
    HBASE-1768  REST server has upper limit of 5k PUT
    HBASE-1766  Add advanced features to HFile.main() to be able to analyze
                storefile problems
+   HBASE-1761  getclosest doesn't understand delete family; manifests as
+               "HRegionInfo was null or empty in .META" A.K.A the BS problem
 
   IMPROVEMENTS
    HBASE-1089  Add count of regions on filesystem to master UI; add percentage

Modified: hadoop/hbase/trunk/bin/HBase.rb
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/bin/HBase.rb?rev=805063&r1=805062&r2=805063&view=diff
==============================================================================
--- hadoop/hbase/trunk/bin/HBase.rb (original)
+++ hadoop/hbase/trunk/bin/HBase.rb Mon Aug 17 17:34:19 2009
@@ -167,6 +167,7 @@
         raise IOError.new("Table " + tableName + " is enabled. Disable it first")
       else
         @admin.deleteTable(tableName)
+        flush(HConstants::META_TABLE_NAME);
         major_compact(HConstants::META_TABLE_NAME);
       end
       @formatter.footer(now)

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/KeyValue.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/KeyValue.java?rev=805063&r1=805062&r2=805063&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/KeyValue.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/KeyValue.java Mon Aug 17 17:34:19 2009
@@ -22,6 +22,7 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.Comparator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -128,7 +129,7 @@
         return compare(a, 0, a.length, b, 0, b.length);
       }
     };
-  
+
   /**
    * Get the appropriate row comparator for the specified table.
    * 
@@ -291,11 +292,12 @@
   
   /**
    * Constructs KeyValue structure filled with null value.
+   * Sets type to {@link KeyValue.Type#Maximum}
    * @param row - row key (arbitrary byte array)
    * @param timestamp
    */
   public KeyValue(final byte [] row, final long timestamp) {
-    this(row, timestamp, Type.Put);
+    this(row, timestamp, Type.Maximum);
   }
 
   /**
@@ -309,13 +311,14 @@
 
   /**
    * Constructs KeyValue structure filled with null value.
+   * Sets type to {@link KeyValue.Type#Maximum}
    * @param row - row key (arbitrary byte array)
    * @param family family name
    * @param qualifier column qualifier
    */
   public KeyValue(final byte [] row, final byte [] family, 
       final byte [] qualifier) {
-    this(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Put);
+    this(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Maximum);
   }
 
   /**
@@ -569,48 +572,9 @@
    * @return Fully copied clone of this KeyValue
    */
   public KeyValue clone() {
-    byte [] bytes = new byte[this.length];
-    System.arraycopy(this.bytes, this.offset, bytes, 0, this.length);
-    return new KeyValue(bytes, 0, bytes.length);
-  }
-
-  /**
-   * Clones a row.
-   * 
-   * @param timestamp  The new time stamp for the row.
-   * @return Clone of bb's key portion with only the row and timestamp filled in.
-   */
-  public KeyValue cloneRow(final long timestamp) {
-    return new KeyValue(getBuffer(), getRowOffset(), getRowLength(),
-        null, 0, 0, null, 0, 0, 
-        timestamp, Type.codeToType(getType()), null, 0, 0);
-  }
-
-  /**
-   * @return Clone of bb's key portion with type set to Type.Maximum. Use this
-   * doing lookups where you are doing getClosest.  Using Maximum, you'll be
-   * sure to trip over all of the other key types since Maximum sorts first.
-   */
-  public KeyValue cloneMaximum() {
-     return createKey(Type.Maximum);
-  }
-
-  /**
-   * Make a clone with the new type. Does not copy value.
-   * 
-   * @param newtype New type to set on clone of this key.
-   * @return Clone of this key with type set to <code>newtype</code>
-   */
-  private KeyValue createKey(final Type newtype) {
-    int keylength = getKeyLength();
-    int l = keylength + ROW_OFFSET;
-    byte [] other = new byte[l];
-    System.arraycopy(getBuffer(), getOffset(), other, 0, l);
-    // Set value length to zero.
-    Bytes.putInt(other, Bytes.SIZEOF_INT, 0);
-    // Set last byte, the type, to new type
-    other[l - 1] = newtype.getCode();
-    return new KeyValue(other, 0, other.length);
+    byte [] b = new byte[this.length];
+    System.arraycopy(this.bytes, this.offset, b, 0, this.length);
+    return new KeyValue(b, 0, b.length);
   }
 
   //---------------------------------------------------------------------------
@@ -946,17 +910,27 @@
   }
 
   /**
-   * @return True if Delete KeyValue type.
+   * @return True if a delete type, a {@link KeyValue.Type#Delete} or
+   * a {KeyValue.Type#DeleteFamily} or a {@link KeyValue.Type.DeleteColumn}
+   * KeyValue type.
+   */
+  public boolean isDelete() {
+    int t = getType();
+    return Type.Delete.getCode() <= t && t <= Type.DeleteFamily.getCode();
+  }
+
+  /**
+   * @return True if this KV is a {@link KeyValue.Type#Delete} type.
    */
   public boolean isDeleteType() {
-    return getType() == Type.Delete.code;
+    return getType() == Type.Delete.getCode();
   }
 
   /**
-   * @return True if DeleteColumn KeyValue type.
+   * @return True if this KV is a delete family type.
    */
-  public boolean isDeleteColumnType() {
-    return getType() == Type.DeleteColumn.code;
+  public boolean isDeleteFamily() {
+    return getType() == Type.DeleteFamily.getCode();
   }
 
   /**
@@ -1258,13 +1232,13 @@
     return index;
   }
 
-  /*
+  /**
    * @param b
    * @param delimiter
-   * @return Index of delimiter having started from end of <code>b</code> moving
-   * leftward.
+   * @return Index of delimiter having started from start of <code>b</code>
+   * moving rightward.
    */
-  static int getDelimiter(final byte [] b, int offset, final int length,
+  public static int getDelimiter(final byte [] b, int offset, final int length,
       final int delimiter) {
     if (b == null) {
       throw new NullPointerException();
@@ -1279,12 +1253,13 @@
     return result;
   }
 
-  /*
+  /**
+   * Find index of passed delimiter walking from end of buffer backwards.
    * @param b
    * @param delimiter
    * @return Index of delimiter
    */
-  static int getDelimiterInReverse(final byte [] b, final int offset,
+  public static int getDelimiterInReverse(final byte [] b, final int offset,
       final int length, final int delimiter) {
     if (b == null) {
       throw new NullPointerException();
@@ -1659,6 +1634,21 @@
   }
 
   /**
+   * Comparator that compares row component only of a KeyValue.
+   */
+  public static class RowComparator implements Comparator<KeyValue> {
+    final KVComparator comparator;
+
+    public RowComparator(final KVComparator c) {
+      this.comparator = c;
+    }
+
+    public int compare(KeyValue left, KeyValue right) {
+      return comparator.compareRows(left, right);
+    }
+  }
+
+  /**
    * Compare key portion of a {@link KeyValue} for keys in <code>.META.</code>
    * table.
    */

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=805063&r1=805062&r2=805063&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java Mon Aug 17 17:34:19 2009
@@ -577,7 +577,7 @@
               REGIONINFO_QUALIFIER);
           if (value == null || value.length == 0) {
             throw new IOException("HRegionInfo was null or empty in " + 
-              Bytes.toString(parentTable) + ", " + regionInfoRow);
+              Bytes.toString(parentTable) + ", row=" + regionInfoRow);
           }
           // convert the row result into the HRegionLocation we need!
           HRegionInfo regionInfo = (HRegionInfo) Writables.getWritable(

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/Put.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/Put.java?rev=805063&r1=805062&r2=805063&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/Put.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/Put.java Mon Aug 17 17:34:19 2009
@@ -68,7 +68,7 @@
   public Put(byte [] row) {
     this(row, null);
   }
-  
+
   /**
    * Create a Put operation for the specified row, using an existing row lock.
    * @param row row key
@@ -83,7 +83,7 @@
       this.lockId = rowLock.getLockId();
     }
   }
-  
+
   /**
    * Copy constructor.  Creates a Put operation cloned from the specified Put.
    * @param putToCopy put to copy
@@ -333,8 +333,7 @@
     this.lockId = in.readLong();
     this.writeToWAL = in.readBoolean();
     int numFamilies = in.readInt();
-    this.familyMap = 
-      new TreeMap<byte [],List<KeyValue>>(Bytes.BYTES_COMPARATOR);
+    if (!this.familyMap.isEmpty()) this.familyMap.clear();
     for(int i=0;i<numFamilies;i++) {
       byte [] family = Bytes.readByteArray(in);
       int numKeys = in.readInt();
@@ -359,7 +358,7 @@
     out.writeLong(this.lockId);
     out.writeBoolean(this.writeToWAL);
     out.writeInt(familyMap.size());
-    for(Map.Entry<byte [], List<KeyValue>> entry : familyMap.entrySet()) {
+    for (Map.Entry<byte [], List<KeyValue>> entry : familyMap.entrySet()) {
       Bytes.writeByteArray(out, entry.getKey());
       List<KeyValue> keys = entry.getValue();
       out.writeInt(keys.size());

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/Result.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/Result.java?rev=805063&r1=805062&r2=805063&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/Result.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/Result.java Mon Aug 17 17:34:19 2009
@@ -499,4 +499,4 @@
     }
     return results;
   }
-}
+}
\ No newline at end of file

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java?rev=805063&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java Mon Aug 17 17:34:19 2009
@@ -0,0 +1,240 @@
+/*
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * State and utility processing {@link HRegion#getClosestRowBefore(byte[], byte[])}.
+ * Like {@link GetDeleteTracker} and {@link ScanDeleteTracker} but does not
+ * implement the {@link DeleteTracker} interface since state spans rows (There
+ * is no update nor reset method).
+ */
+class GetClosestRowBeforeTracker {
+  private final KeyValue targetkey;
+  // Any cell w/ a ts older than this is expired.
+  private final long oldestts;
+  private KeyValue candidate = null;
+  private final KVComparator kvcomparator;
+  // Flag for whether we're doing getclosest on a metaregion.
+  private final boolean metaregion;
+  // Offset and length into targetkey demarking table name (if in a metaregion).
+  private final int rowoffset;
+  private final int tablenamePlusDelimiterLength;
+
+  // Deletes keyed by row.  Comparator compares on row portion of KeyValue only.
+  private final NavigableMap<KeyValue, NavigableSet<KeyValue>> deletes;
+
+  /**
+   * @param c
+   * @param kv Presume first on row: i.e. empty column, maximum timestamp and
+   * a type of Type.Maximum
+   * @param ttl Time to live in ms for this Store
+   * @param metaregion True if this is .META. or -ROOT- region.
+   */
+  GetClosestRowBeforeTracker(final KVComparator c, final KeyValue kv,
+      final long ttl, final boolean metaregion) {
+    super();
+    this.metaregion = metaregion;
+    this.targetkey = kv;
+    // If we are in a metaregion, then our table name is the prefix on the
+    // targetkey.
+    this.rowoffset = kv.getRowOffset();
+    int l = -1;
+    if (metaregion) {
+      l = KeyValue.getDelimiter(kv.getBuffer(), rowoffset, kv.getRowLength(),
+        HRegionInfo.DELIMITER) - this.rowoffset;
+    }
+    this.tablenamePlusDelimiterLength = metaregion? l + 1: -1;
+    this.oldestts = System.currentTimeMillis() - ttl;
+    this.kvcomparator = c;
+    KeyValue.RowComparator rc = new KeyValue.RowComparator(this.kvcomparator);
+    this.deletes = new TreeMap<KeyValue, NavigableSet<KeyValue>>(rc);
+  }
+
+  /**
+   * @param kv
+   * @return True if this <code>kv</code> is expired.
+   */
+  boolean isExpired(final KeyValue kv) {
+    return Store.isExpired(kv, this.oldestts);
+  }
+
+  /*
+   * Add the specified KeyValue to the list of deletes.
+   * @param kv
+   */
+  private void addDelete(final KeyValue kv) {
+    NavigableSet<KeyValue> rowdeletes = this.deletes.get(kv);
+    if (rowdeletes == null) {
+      rowdeletes = new TreeSet<KeyValue>(this.kvcomparator);
+      this.deletes.put(kv, rowdeletes);
+    }
+    rowdeletes.add(kv);
+  }
+
+  /*
+   * @param kv Adds candidate if nearer the target than previous candidate.
+   * @return True if updated candidate.
+   */
+  private boolean addCandidate(final KeyValue kv) {
+    if (!isDeleted(kv) && isBetterCandidate(kv)) {
+      this.candidate = kv;
+      return true;
+    }
+    return false;
+  }
+
+  boolean isBetterCandidate(final KeyValue contender) {
+    return this.candidate == null ||
+      (this.kvcomparator.compareRows(this.candidate, contender) < 0 &&
+        this.kvcomparator.compareRows(contender, this.targetkey) <= 0);
+  }
+
+  /*
+   * Check if specified KeyValue buffer has been deleted by a previously
+   * seen delete.
+   * @param kv
+   * @return true is the specified KeyValue is deleted, false if not
+   */
+  private boolean isDeleted(final KeyValue kv) {
+    if (this.deletes.isEmpty()) return false;
+    NavigableSet<KeyValue> rowdeletes = this.deletes.get(kv);
+    if (rowdeletes == null || rowdeletes.isEmpty()) return false;
+    return isDeleted(kv, rowdeletes);
+  }
+
+  /** 
+   * Check if the specified KeyValue buffer has been deleted by a previously
+   * seen delete.
+   * @param kv
+   * @param ds
+   * @return True is the specified KeyValue is deleted, false if not
+   */
+  public boolean isDeleted(final KeyValue kv, final NavigableSet<KeyValue> ds) {
+    if (deletes == null || deletes.isEmpty()) return false;
+    for (KeyValue d: ds) {
+      long kvts = kv.getTimestamp();
+      long dts = d.getTimestamp();
+      if (d.isDeleteFamily()) {
+        if (kvts <= dts) return true;
+        continue;
+      }
+      // Check column
+      int ret = Bytes.compareTo(kv.getBuffer(), kv.getQualifierOffset(),
+          kv.getQualifierLength(),
+        d.getBuffer(), d.getQualifierOffset(), d.getQualifierLength());
+      if (ret <= -1) {
+        // This delete is for an earlier column.
+        continue;
+      } else if (ret >= 1) {
+        // Beyond this kv.
+        break;
+      }
+      // Check Timestamp
+      if (kvts > dts) return false;
+
+      // Check Type
+      switch (KeyValue.Type.codeToType(d.getType())) {
+        case Delete: return kvts == dts;
+        case DeleteColumn: return true;
+        default: continue;
+      }
+    }
+    return false;
+  }
+
+  /*
+   * Handle keys whose values hold deletes.
+   * Add to the set of deletes and then if the candidate keys contain any that
+   * might match, then check for a match and remove it.  Implies candidates
+   * is made with a Comparator that ignores key type.
+   * @param kv
+   * @return True if we removed <code>k</code> from <code>candidates</code>.
+   */
+  boolean handleDeletes(final KeyValue kv) {
+    addDelete(kv);
+    boolean deleted = false;
+    if (!hasCandidate()) return deleted;
+    if (isDeleted(this.candidate)) {
+      this.candidate = null;
+      deleted = true;
+    }
+    return deleted;
+  }
+
+  /**
+   * Do right thing with passed key, add to deletes or add to candidates.
+   * @param kv
+   * @return True if we added a candidate
+   */
+  boolean handle(final KeyValue kv) {
+    if (kv.isDelete()) {
+      handleDeletes(kv);
+      return false;
+    }
+    return addCandidate(kv);
+  }
+
+  /**
+   * @return True if has candidate
+   */
+  public boolean hasCandidate() {
+    return this.candidate != null;
+  }
+
+  /**
+   * @return Best candidate or null.
+   */
+  public KeyValue getCandidate() {
+    return this.candidate;
+  }
+
+  public KeyValue getTargetKey() {
+    return this.targetkey;
+  }
+
+  /**
+   * @param kv Current kv
+   * @param First on row kv.
+   * @param state
+   * @return True if we went too far, past the target key.
+   */
+  boolean isTooFar(final KeyValue kv, final KeyValue firstOnRow) {
+    return this.kvcomparator.compareRows(kv, firstOnRow) > 0;
+  }
+
+  boolean isTargetTable(final KeyValue kv) {
+    if (!metaregion) return true;
+    // Compare start of keys row.  Compare including delimiter.  Saves having
+    // to calculate where tablename ends in the candidate kv.
+    return Bytes.compareTo(this.targetkey.getBuffer(), this.rowoffset,
+        this.tablenamePlusDelimiterLength,
+      kv.getBuffer(), kv.getRowOffset(), this.tablenamePlusDelimiterLength) == 0;
+  }
+} 
\ No newline at end of file

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/GetDeleteTracker.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/GetDeleteTracker.java?rev=805063&r1=805062&r2=805063&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/GetDeleteTracker.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/GetDeleteTracker.java Mon Aug 17 17:34:19 2009
@@ -38,8 +38,8 @@
  * This class is NOT thread-safe as queries are never multi-threaded 
  */
 public class GetDeleteTracker implements DeleteTracker {
-
-  private long familyStamp = -1L;
+  private static long UNSET = -1L;
+  private long familyStamp = UNSET;
   protected List<Delete> deletes = null;
   private List<Delete> newDeletes = new ArrayList<Delete>();
   private Iterator<Delete> iterator;
@@ -64,7 +64,7 @@
   @Override
   public void add(byte [] buffer, int qualifierOffset, int qualifierLength,
       long timestamp, byte type) {
-    if(type == KeyValue.Type.DeleteFamily.getCode()) {
+    if (type == KeyValue.Type.DeleteFamily.getCode()) {
       if(timestamp > familyStamp) {
         familyStamp = timestamp;
       }
@@ -88,14 +88,13 @@
   @Override
   public boolean isDeleted(byte [] buffer, int qualifierOffset,
       int qualifierLength, long timestamp) {
-
     // Check against DeleteFamily
     if (timestamp <= familyStamp) {
       return true;
     }
 
     // Check if there are other deletes
-    if(this.delete == null) {
+    if (this.delete == null) {
       return false;
     }
 
@@ -103,7 +102,7 @@
     int ret = Bytes.compareTo(buffer, qualifierOffset, qualifierLength,
         this.delete.buffer, this.delete.qualifierOffset, 
         this.delete.qualifierLength);
-    if(ret <= -1) {
+    if (ret <= -1) {
       // Have not reached the next delete yet
       return false;
     } else if(ret >= 1) {
@@ -149,10 +148,8 @@
 
   @Override
   public boolean isEmpty() {
-    if(this.familyStamp == 0L && this.delete == null) {
-      return true;
-    }
-    return false;
+    return this.familyStamp == UNSET && this.delete == null &&
+      this.newDeletes.isEmpty();
   }
 
   @Override
@@ -160,7 +157,7 @@
     this.deletes = null;
     this.delete = null;
     this.newDeletes = new ArrayList<Delete>();
-    this.familyStamp = 0L;
+    this.familyStamp = UNSET;
     this.iterator = null;
   }
 
@@ -173,7 +170,7 @@
   @Override
   public void update() {
     // If no previous deletes, use new deletes and return
-    if(this.deletes == null || this.deletes.size() == 0) {
+    if (this.deletes == null || this.deletes.size() == 0) {
       finalize(this.newDeletes);
       return;
     }

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=805063&r1=805062&r2=805063&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Mon Aug 17 17:34:19 2009
@@ -1022,9 +1022,8 @@
    * @return map of values
    * @throws IOException
    */
-  public Result getClosestRowBefore(final byte [] row,
-    final byte [] family)
-  throws IOException{
+  public Result getClosestRowBefore(final byte [] row, final byte [] family)
+  throws IOException {
     // look across all the HStores for this region and determine what the
     // closest key is across all column families, since the data may be sparse
     KeyValue key = null;
@@ -1038,22 +1037,16 @@
       if (key == null) {
         return null;
       }
-      List<KeyValue> results = new ArrayList<KeyValue>();
-      // This will get all results for this store.  TODO: Do I have to make a
-      // new key?
-      if (!this.comparator.matchingRows(kv, key)) {
-        kv = new KeyValue(key.getRow(), HConstants.LATEST_TIMESTAMP);
-      }
+      // This will get all results for this store.  TODO: Do we need to do this?
       Get get = new Get(key.getRow());
+      List<KeyValue> results = new ArrayList<KeyValue>();
       store.get(get, null, results);
-      
       return new Result(results);
     } finally {
       splitsAndClosesLock.readLock().unlock();
     }
   }
 
-  //TODO
   /**
    * Return an iterator that scans over the HRegion, returning the indicated 
    * columns and rows specified by the {@link Scan}.
@@ -1166,7 +1159,8 @@
           Get g = new Get(kv.getRow());
           NavigableSet<byte []> qualifiers =
             new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
-          qualifiers.add(kv.getQualifier());
+          byte [] q = kv.getQualifier();
+          if (q != null && q.length > 0) qualifiers.add(kv.getQualifier());
           get(store, g, qualifiers, result);
           if (result.isEmpty()) {
             // Nothing to delete
@@ -2432,6 +2426,7 @@
       } else {
         // Default behavior
         Scan scan = new Scan();
+        // scan.addFamily(HConstants.CATALOG_FAMILY);
         InternalScanner scanner = region.getScanner(scan);
         try {
           List<KeyValue> kvs = new ArrayList<KeyValue>();
@@ -2444,6 +2439,7 @@
         } finally {
           scanner.close();
         }
+        // System.out.println(region.getClosestRowBefore(Bytes.toBytes("GeneratedCSVContent2,E3652782193BC8D66A0BA1629D0FAAAB,9993372036854775807")));
       }
     } finally {
       region.close();
@@ -2481,7 +2477,6 @@
         printUsageAndExit("ERROR: Unrecognized option <" + args[1] + ">");
       }
       majorCompact = true;
-    
     }
     Path tableDir  = new Path(args[0]);
     HBaseConfiguration c = new HBaseConfiguration();
@@ -2497,4 +2492,4 @@
        if (bc != null) bc.shutdown();
      }
   }
-}
+}
\ No newline at end of file

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=805063&r1=805062&r2=805063&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java Mon Aug 17 17:34:19 2009
@@ -29,7 +29,6 @@
 import java.util.List;
 import java.util.NavigableSet;
 import java.util.SortedSet;
-import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -56,8 +55,6 @@
 public class MemStore implements HeapSize {
   private static final Log LOG = LogFactory.getLog(MemStore.class);
 
-  private final long ttl;
-
   // MemStore.  Use a KeyValueSkipListSet rather than SkipListSet because of the
   // better semantics.  The Map will overwrite if passed a key it already had
   // whereas the Set will not add new KV if key is same though value might be
@@ -68,7 +65,7 @@
   // Snapshot of memstore.  Made for flusher.
   volatile KeyValueSkipListSet snapshot;
 
-  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+  final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
   final KeyValue.KVComparator comparator;
 
@@ -85,7 +82,7 @@
    * Default constructor. Used for tests.
    */
   public MemStore() {
-    this(HConstants.FOREVER, KeyValue.COMPARATOR);
+    this(KeyValue.COMPARATOR);
   }
 
   /**
@@ -93,8 +90,7 @@
    * @param ttl The TTL for cache entries, in milliseconds.
    * @param c
    */
-  public MemStore(final long ttl, final KeyValue.KVComparator c) {
-    this.ttl = ttl;
+  public MemStore(final KeyValue.KVComparator c) {
     this.comparator = c;
     this.comparatorIgnoreTimestamp =
       this.comparator.getComparatorIgnoringTimestamps();
@@ -184,15 +180,15 @@
    * @return approximate size of the passed key and value.
    */
   long add(final KeyValue kv) {
-    long size = -1;
+    long s = -1;
     this.lock.readLock().lock();
     try {
-      size = heapSizeChange(kv, this.kvset.add(kv));
-      this.size.addAndGet(size);
+      s = heapSizeChange(kv, this.kvset.add(kv));
+      this.size.addAndGet(s);
     } finally {
       this.lock.readLock().unlock();
     }
-    return size;
+    return s;
   }
 
   /** 
@@ -201,7 +197,7 @@
    * @return approximate size of the passed key and value.
    */
   long delete(final KeyValue delete) {
-    long size = 0;
+    long s = 0;
     this.lock.readLock().lock();
     //Have to find out what we want to do here, to find the fastest way of
     //removing things that are under a delete.
@@ -261,17 +257,17 @@
       //Delete all the entries effected by the last added delete
       for (KeyValue kv : deletes) {
         notpresent = this.kvset.remove(kv);
-        size -= heapSizeChange(kv, notpresent);
+        s -= heapSizeChange(kv, notpresent);
       }
       
       // Adding the delete to memstore. Add any value, as long as
       // same instance each time.
-      size += heapSizeChange(delete, this.kvset.add(delete));
+      s += heapSizeChange(delete, this.kvset.add(delete));
     } finally {
       this.lock.readLock().unlock();
     }
-    this.size.addAndGet(size);
-    return size;
+    this.size.addAndGet(s);
+    return s;
   }
   
   /**
@@ -325,200 +321,122 @@
     return result;
   }
 
-
-  /**
-   * @param row Row to look for.
-   * @param candidateKeys Map of candidate keys (Accumulation over lots of
-   * lookup over stores and memstores)
-   */
-  void getRowKeyAtOrBefore(final KeyValue row,
-      final NavigableSet<KeyValue> candidateKeys) {
-    getRowKeyAtOrBefore(row, candidateKeys,
-      new TreeSet<KeyValue>(this.comparator), System.currentTimeMillis());
-  }
-
   /**
-   * @param kv Row to look for.
-   * @param candidates Map of candidate keys (Accumulation over lots of
-   * lookup over stores and memstores).  Pass a Set with a Comparator that
-   * ignores key Type so we can do Set.remove using a delete, i.e. a KeyValue
-   * with a different Type to the candidate key.
-   * @param deletes Pass a Set that has a Comparator that ignores key type.
-   * @param now
-   */
-  void getRowKeyAtOrBefore(final KeyValue kv,
-      final NavigableSet<KeyValue> candidates, 
-      final NavigableSet<KeyValue> deletes, final long now) {
+   * @param state
+   */
+  void getRowKeyAtOrBefore(final GetClosestRowBeforeTracker state) {
     this.lock.readLock().lock();
     try {
-      getRowKeyAtOrBefore(kvset, kv, candidates, deletes, now);
-      getRowKeyAtOrBefore(snapshot, kv, candidates, deletes, now);
+      getRowKeyAtOrBefore(kvset, state);
+      getRowKeyAtOrBefore(snapshot, state);
     } finally {
       this.lock.readLock().unlock();
     }
   }
 
+  /*
+   * @param set
+   * @param state Accumulates deletes and candidates.
+   */
   private void getRowKeyAtOrBefore(final NavigableSet<KeyValue> set,
-      final KeyValue kv, final NavigableSet<KeyValue> candidates,
-      final NavigableSet<KeyValue> deletes, final long now) {
+      final GetClosestRowBeforeTracker state) {
     if (set.isEmpty()) {
       return;
     }
-    // We want the earliest possible to start searching from.  Start before
-    // the candidate key in case it turns out a delete came in later.
-    KeyValue search = candidates.isEmpty()? kv: candidates.first();
-
-    // Get all the entries that come equal or after our search key
-    SortedSet<KeyValue> tail = set.tailSet(search);
-
-    // if there are items in the tail map, there's either a direct match to
-    // the search key, or a range of values between the first candidate key
-    // and the ultimate search key (or the end of the cache)
-    if (!tail.isEmpty() &&
-        this.comparator.compareRows(tail.first(), search) <= 0) {
-      // Keep looking at cells as long as they are no greater than the 
-      // ultimate search key and there's still records left in the map.
-      KeyValue deleted = null;
-      KeyValue found = null;
-      for (Iterator<KeyValue> iterator = tail.iterator();
-        iterator.hasNext() && (found == null ||
-          this.comparator.compareRows(found, kv) <= 0);) {
-        found = iterator.next();
-        if (this.comparator.compareRows(found, kv) <= 0) {
-          if (found.isDeleteType()) {
-            Store.handleDeletes(found, candidates, deletes);
-            if (deleted == null) {
-              deleted = found;
-            }
-          } else {
-            if (Store.notExpiredAndNotInDeletes(this.ttl, found, now, deletes)) {
-              candidates.add(found);
-            } else {
-              if (deleted == null) {
-                deleted = found;
-              }
-              // TODO: Check this removes the right key.
-              // Its expired.  Remove it.
-              iterator.remove();
-            }
-          }
-        }
+    if (!walkForwardInSingleRow(set, state.getTargetKey(), state)) {
+      // Found nothing in row.  Try backing up.
+      getRowKeyBefore(set, state);
+    }
+  }
+
+  /*
+   * Walk forward in a row from <code>firstOnRow</code>.  Presumption is that
+   * we have been passed the first possible key on a row.  As we walk forward
+   * we accumulate deletes until we hit a candidate on the row at which point
+   * we return.
+   * @param set
+   * @param firstOnRow First possible key on this row.
+   * @param state
+   * @return True if we found a candidate walking this row.
+   */
+  private boolean walkForwardInSingleRow(final SortedSet<KeyValue> set,
+      final KeyValue firstOnRow, final GetClosestRowBeforeTracker state) {
+    boolean foundCandidate = false;
+    SortedSet<KeyValue> tail = set.tailSet(firstOnRow);
+    if (tail.isEmpty()) return foundCandidate;
+    for (Iterator<KeyValue> i = tail.iterator(); i.hasNext();) {
+      KeyValue kv = i.next();
+      // Did we go beyond the target row? If so break.
+      if (state.isTooFar(kv, firstOnRow)) break;
+      if (state.isExpired(kv)) {
+        i.remove();
+        continue;
       }
-      if (candidates.isEmpty() && deleted != null) {
-        getRowKeyBefore(set, deleted, candidates, deletes, now);
+      // If we added something, this row is a contender. break.
+      if (state.handle(kv)) {
+        foundCandidate = true;
+        break;
       }
-    } else {
-      // The tail didn't contain any keys that matched our criteria, or was 
-      // empty. Examine all the keys that proceed our splitting point.
-      getRowKeyBefore(set, search, candidates, deletes, now);
     }
+    return foundCandidate;
   }
 
   /*
-   * Get row key that comes before passed <code>search_key</code>
-   * Use when we know search_key is not in the map and we need to search
-   * earlier in the cache.
+   * Walk backwards through the passed set a row at a time until we run out of
+   * set or until we get a candidate.
    * @param set
-   * @param search
-   * @param candidates
-   * @param deletes Pass a Set that has a Comparator that ignores key type.
-   * @param now
+   * @param state
    */
   private void getRowKeyBefore(NavigableSet<KeyValue> set,
-      KeyValue search, NavigableSet<KeyValue> candidates,
-      final NavigableSet<KeyValue> deletes, final long now) {
-    NavigableSet<KeyValue> head = set.headSet(search, false);
-    // If we tried to create a headMap and got an empty map, then there are
-    // no keys at or before the search key, so we're done.
-    if (head.isEmpty()) {
-      return;
+      final GetClosestRowBeforeTracker state) {
+    KeyValue firstOnRow = state.getTargetKey();
+    for (Member p = memberOfPreviousRow(set, state, firstOnRow);
+        p != null; p = memberOfPreviousRow(p.set, state, firstOnRow)) {
+      // Make sure we don't fall out of our table.
+      if (!state.isTargetTable(p.kv)) break;
+      // Stop looking if we've exited the better candidate range.
+      if (!state.isBetterCandidate(p.kv)) break;
+      // Make into firstOnRow
+      firstOnRow = new KeyValue(p.kv.getRow(), HConstants.LATEST_TIMESTAMP);
+      // If we find something, break;
+      if (walkForwardInSingleRow(p.set, firstOnRow, state)) break;
     }
+  }
 
-    // If there aren't any candidate keys at this point, we need to search
-    // backwards until we find at least one candidate or run out of headMap.
-    if (candidates.isEmpty()) {
-      KeyValue lastFound = null;
-      // TODO: Confirm we're iterating in the right order
-      for (Iterator<KeyValue> i = head.descendingIterator();
-          i.hasNext();) {
-        KeyValue found = i.next();
-        // if the last row we found a candidate key for is different than
-        // the row of the current candidate, we can stop looking -- if its
-        // not a delete record.
-        boolean deleted = found.isDeleteType();
-        if (lastFound != null &&
-            this.comparator.matchingRows(lastFound, found) && !deleted) {
-          break;
-        }
-        // If this isn't a delete, record it as a candidate key. Also 
-        // take note of this candidate so that we'll know when
-        // we cross the row boundary into the previous row.
-        if (!deleted) {
-          if (Store.notExpiredAndNotInDeletes(this.ttl, found, now, deletes)) {
-            lastFound = found;
-            candidates.add(found);
-          } else {
-            // Its expired.
-            Store.expiredOrDeleted(set, found);
-          }
-        } else {
-          // We are encountering items in reverse.  We may have just added
-          // an item to candidates that this later item deletes.  Check.  If we
-          // found something in candidates, remove it from the set.
-          if (Store.handleDeletes(found, candidates, deletes)) {
-            remove(set, found);
-          }
-        }
-      }
-    } else {
-      // If there are already some candidate keys, we only need to consider
-      // the very last row's worth of keys in the headMap, because any 
-      // smaller acceptable candidate keys would have caused us to start
-      // our search earlier in the list, and we wouldn't be searching here.
-      SortedSet<KeyValue> rowTail = 
-        head.tailSet(head.last().cloneRow(HConstants.LATEST_TIMESTAMP));
-      Iterator<KeyValue> i = rowTail.iterator();
-      do {
-        KeyValue found = i.next();
-        if (found.isDeleteType()) {
-          Store.handleDeletes(found, candidates, deletes);
-        } else {
-          if (ttl == HConstants.FOREVER ||
-              now < found.getTimestamp() + ttl ||
-              !deletes.contains(found)) {
-            candidates.add(found);
-          } else {
-            Store.expiredOrDeleted(set, found);
-          }
-        }
-      } while (i.hasNext());
+  /*
+   * Immutable data structure to hold member found in set and the set it was
+   * found in.  Include set because it is carrying context.
+   */
+  private class Member {
+    final KeyValue kv;
+    final NavigableSet<KeyValue> set;
+    Member(final NavigableSet<KeyValue> s, final KeyValue kv) {
+      this.kv = kv;
+      this.set = s;
     }
   }
 
-
   /*
-   * @param set
-   * @param delete This is a delete record.  Remove anything behind this of same
-   * r/c/ts.
-   * @return True if we removed anything.
-   */
-  private boolean remove(final NavigableSet<KeyValue> set,
-      final KeyValue delete) {
-    SortedSet<KeyValue> s = set.tailSet(delete);
-    if (s.isEmpty()) {
-      return false;
-    }
-    boolean removed = false;
-    for (KeyValue kv: s) {
-      if (this.comparatorIgnoreType.compare(kv, delete) == 0) {
-        // Same r/c/ts.  Remove it.
-        s.remove(kv);
-        removed = true;
+   * @param set Set to walk back in.  Pass a first in row or we'll return
+   * same row (loop).
+   * @param state Utility and context.
+   * @param firstOnRow First item on the row after the one we want to find a
+   * member in.
+   * @return Null or member of row previous to <code>firstOnRow</code>
+   */
+  private Member memberOfPreviousRow(NavigableSet<KeyValue> set,
+      final GetClosestRowBeforeTracker state, final KeyValue firstOnRow) {
+    NavigableSet<KeyValue> head = set.headSet(firstOnRow, false);
+    if (head.isEmpty()) return null;
+    for (Iterator<KeyValue> i = head.descendingIterator(); i.hasNext();) {
+      KeyValue found = i.next();
+      if (state.isExpired(found)) {
+        i.remove();
         continue;
       }
-      break;
+      return new Member(head, found);
     }
-    return removed;
+    return null;
   }
 
   /**
@@ -689,7 +607,7 @@
   }
   
   public final static long FIXED_OVERHEAD = ClassSize.align(
-      ClassSize.OBJECT + Bytes.SIZEOF_LONG + (7 * ClassSize.REFERENCE));
+      ClassSize.OBJECT + (7 * ClassSize.REFERENCE));
   
   public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
       ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG +

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=805063&r1=805062&r2=805063&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java Mon Aug 17 17:34:19 2009
@@ -50,7 +50,6 @@
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.HeapSize;
-import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.hbase.io.hfile.Compression;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
@@ -58,6 +57,7 @@
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.StringUtils;
 
@@ -181,7 +181,7 @@
       // second -> ms adjust for user data
       this.ttl *= 1000;
     }
-    this.memstore = new MemStore(this.ttl, this.comparator);
+    this.memstore = new MemStore(this.comparator);
     this.regionCompactionDir = new Path(HRegion.getCompactionDir(basedir), 
         Integer.toString(info.getEncodedName()));
     this.storeName = this.family.getName();
@@ -1028,293 +1028,144 @@
     }
   }
 
+  static boolean isExpired(final KeyValue key, final long oldestTimestamp) {
+    return key.getTimestamp() < oldestTimestamp;
+  }
+
   /**
    * Find the key that matches <i>row</i> exactly, or the one that immediately
    * preceeds it. WARNING: Only use this method on a table where writes occur 
-   * with stricly increasing timestamps. This method assumes this pattern of 
-   * writes in order to make it reasonably performant.
-   * @param targetkey
-   * @return Found keyvalue
+   * with strictly increasing timestamps. This method assumes this pattern of 
+   * writes in order to make it reasonably performant.  Also our search is
+   * dependent on the axiom that deletes are for cells that are in the container
+   * that follows whether a memstore snapshot or a storefile, not for the
+   * current container: i.e. we'll see deletes before we come across cells we
+   * are to delete. Presumption is that the memstore#kvset is processed before
+   * memstore#snapshot and so on.
+   * @param kv First possible item on targeted row; i.e. empty columns, latest
+   * timestamp and maximum type.
+   * @return Found keyvalue or null if none found.
    * @throws IOException
    */
-  KeyValue getRowKeyAtOrBefore(final KeyValue targetkey)
-  throws IOException{
-    // Map of keys that are candidates for holding the row key that
-    // most closely matches what we're looking for. We'll have to update it as
-    // deletes are found all over the place as we go along before finally
-    // reading the best key out of it at the end.   Use a comparator that
-    // ignores key types.  Otherwise, we can't remove deleted items doing
-    // set.remove because of the differing type between insert and delete.
-    NavigableSet<KeyValue> candidates =
-      new TreeSet<KeyValue>(this.comparator.getComparatorIgnoringType());
-
-    // Keep a list of deleted cell keys.  We need this because as we go through
-    // the store files, the cell with the delete marker may be in one file and
-    // the old non-delete cell value in a later store file. If we don't keep
-    // around the fact that the cell was deleted in a newer record, we end up
-    // returning the old value if user is asking for more than one version.
-    // This List of deletes should not be large since we are only keeping rows
-    // and columns that match those set on the scanner and which have delete
-    // values.  If memory usage becomes an issue, could redo as bloom filter.
-    NavigableSet<KeyValue> deletes =
-      new TreeSet<KeyValue>(this.comparatorIgnoringType);
-    long now = System.currentTimeMillis();
+  KeyValue getRowKeyAtOrBefore(final KeyValue kv)
+  throws IOException {
+    GetClosestRowBeforeTracker state = new GetClosestRowBeforeTracker(
+      this.comparator, kv, this.ttl, this.region.getRegionInfo().isMetaRegion());
     this.lock.readLock().lock();
     try {
       // First go to the memstore.  Pick up deletes and candidates.
-      this.memstore.getRowKeyAtOrBefore(targetkey, candidates, deletes, now);
-      // Process each store file.  Run through from newest to oldest.
+      this.memstore.getRowKeyAtOrBefore(state);
+      // Check if match, if we got a candidate on the asked for 'kv' row.
+      // Process each store file. Run through from newest to oldest.
       Map<Long, StoreFile> m = this.storefiles.descendingMap();
-      for (Map.Entry<Long, StoreFile> e: m.entrySet()) {
+      for (Map.Entry<Long, StoreFile> e : m.entrySet()) {
         // Update the candidate keys from the current map file
-        rowAtOrBeforeFromStoreFile(e.getValue(), targetkey, candidates,
-          deletes, now);
+        rowAtOrBeforeFromStoreFile(e.getValue(), state);
       }
-      // Return the best key from candidateKeys
-      return candidates.isEmpty()? null: candidates.last();
+      return state.getCandidate();
     } finally {
       this.lock.readLock().unlock();
     }
   }
 
   /*
-   * Check an individual MapFile for the row at or before a given key 
-   * and timestamp
+   * Check an individual MapFile for the row at or before a given row.
    * @param f
-   * @param targetkey
-   * @param candidates Pass a Set with a Comparator that
-   * ignores key Type so we can do Set.remove using a delete, i.e. a KeyValue
-   * with a different Type to the candidate key.
+   * @param state
    * @throws IOException
    */
   private void rowAtOrBeforeFromStoreFile(final StoreFile f,
-    final KeyValue targetkey, final NavigableSet<KeyValue> candidates,
-    final NavigableSet<KeyValue> deletes, final long now)
-  throws IOException {
-    // if there aren't any candidate keys yet, we'll do some things different 
-    if (candidates.isEmpty()) {
-      rowAtOrBeforeCandidate(f, targetkey, candidates, deletes, now);
-    } else {
-      rowAtOrBeforeWithCandidates(f, targetkey, candidates, deletes, now);
-    }
-  }
-
-  /* 
-   * @param ttlSetting
-   * @param hsk
-   * @param now
-   * @param deletes A Set whose Comparator ignores Type.
-   * @return True if key has not expired and is not in passed set of deletes.
-   */
-  static boolean notExpiredAndNotInDeletes(final long ttl,
-      final KeyValue key, final long now, final Set<KeyValue> deletes) {
-    return !isExpired(key, now-ttl) && (deletes == null || deletes.isEmpty() ||
-        !deletes.contains(key));
-  }
-
-  static boolean isExpired(final KeyValue key, final long oldestTimestamp) {
-    return key.getTimestamp() < oldestTimestamp;
-  }
-
-  /* Find a candidate for row that is at or before passed key, searchkey, in hfile.
-   * @param f
-   * @param targetkey Key to go search the hfile with.
-   * @param candidates
-   * @param now
-   * @throws IOException
-   * @see {@link #rowAtOrBeforeCandidate(HStoreKey, org.apache.hadoop.io.MapFile.Reader, byte[], SortedMap, long)}
-   */
-  private void rowAtOrBeforeCandidate(final StoreFile f,
-    final KeyValue targetkey, final NavigableSet<KeyValue> candidates,
-    final NavigableSet<KeyValue> deletes, final long now)
+    final GetClosestRowBeforeTracker state)
   throws IOException {
-    KeyValue search = targetkey;
-    // If the row we're looking for is past the end of this mapfile, set the
-    // search key to be the last key.  If its a deleted key, then we'll back
-    // up to the row before and return that.
-    // TODO: Cache last key as KV over in the file.
     Reader r = f.getReader();
     if (r == null) {
       LOG.warn("StoreFile " + f + " has a null Reader");
       return;
     }
-    byte [] lastkey = r.getLastKey();
-    KeyValue lastKeyValue =
-      KeyValue.createKeyValueFromKey(lastkey, 0, lastkey.length);
-    if (this.comparator.compareRows(lastKeyValue, targetkey) < 0) {
-      search = lastKeyValue;
+    // TODO: Cache these keys rather than make each time?
+    byte [] fk = r.getFirstKey();
+    KeyValue firstKV = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
+    byte [] lk = r.getLastKey();
+    KeyValue lastKV = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
+    KeyValue firstOnRow = state.getTargetKey();
+    if (this.comparator.compareRows(lastKV, firstOnRow) < 0) {
+      // If last key in file is not of the target table, no candidates in this
+      // file.  Return.
+      if (!state.isTargetTable(lastKV)) return;
+      // If the row we're looking for is past the end of file, set search key to
+      // last key. TODO: Cache last and first key rather than make each time.
+      firstOnRow = new KeyValue(lastKV.getRow(), HConstants.LATEST_TIMESTAMP);
     }
-    KeyValue knownNoGoodKey = null;
     HFileScanner scanner = r.getScanner();
-    for (boolean foundCandidate = false; !foundCandidate;) {
-      // Seek to the exact row, or the one that would be immediately before it
-      int result = scanner.seekTo(search.getBuffer(), search.getKeyOffset(),
-        search.getKeyLength());
-      if (result < 0) {
-        // Not in file.
-        break;
-      }
-      KeyValue deletedOrExpiredRow = null;
-      KeyValue kv = null;
-      do {
-        kv = scanner.getKeyValue();
-        if (this.comparator.compareRows(kv, search) <= 0) {
-          if (!kv.isDeleteType()) {
-            if (handleNonDelete(kv, now, deletes, candidates)) {
-              foundCandidate = true;
-              // NOTE! Continue.
-              continue;
-            }
-          }
-          deletes.add(kv);
-          if (deletedOrExpiredRow == null) {
-            deletedOrExpiredRow = kv;
-          }
-        } else if (this.comparator.compareRows(kv, search) > 0) {
-          // if the row key we just read is beyond the key we're searching for,
-          // then we're done.
-          break;
-        } else {
-          // So, the row key doesn't match, but we haven't gone past the row
-          // we're seeking yet, so this row is a candidate for closest
-          // (assuming that it isn't a delete).
-          if (!kv.isDeleteType()) {
-            if (handleNonDelete(kv, now, deletes, candidates)) {
-              foundCandidate = true;
-              // NOTE: Continue
-              continue;
-            }
-          }
-          deletes.add(kv);
-          if (deletedOrExpiredRow == null) {
-            deletedOrExpiredRow = kv;
-          }
-        }
-      } while(scanner.next() && (knownNoGoodKey == null ||
-          this.comparator.compare(kv, knownNoGoodKey) < 0));
-
-      // If we get here and have no candidates but we did find a deleted or
-      // expired candidate, we need to look at the key before that
-      if (!foundCandidate && deletedOrExpiredRow != null) {
-        knownNoGoodKey = deletedOrExpiredRow;
-        if (!scanner.seekBefore(deletedOrExpiredRow.getBuffer(),
-            deletedOrExpiredRow.getKeyOffset(),
-            deletedOrExpiredRow.getKeyLength())) {
-          // Not in file -- what can I do now but break?
-          break;
-        }
-        search = scanner.getKeyValue();
-      } else {
-        // No candidates and no deleted or expired candidates. Give up.
-        break;
-      }
+    // Seek scanner.  If can't seek it, return.
+    if (!seekToScanner(scanner, firstOnRow, firstKV)) return;
+    // If we found candidate on firstOnRow, just return. THIS WILL NEVER HAPPEN!
+    // Unlikely that there'll be an instance of actual first row in table.
+    if (walkForwardInSingleRow(scanner, firstOnRow, state)) return;
+    // If here, need to start backing up.
+    while (scanner.seekBefore(firstOnRow.getBuffer(), firstOnRow.getKeyOffset(),
+       firstOnRow.getKeyLength())) {
+      KeyValue kv = scanner.getKeyValue();
+      if (!state.isTargetTable(kv)) break;
+      if (!state.isBetterCandidate(kv)) break;
+      // Make new first on row.
+      firstOnRow = new KeyValue(kv.getRow(), HConstants.LATEST_TIMESTAMP);
+      // Seek scanner.  If can't seek it, break.
+      if (!seekToScanner(scanner, firstOnRow, firstKV)) break;
+      // If we find something, break;
+      if (walkForwardInSingleRow(scanner, firstOnRow, state)) break;
     }
-    
-    // Arriving here just means that we consumed the whole rest of the map
-    // without going "past" the key we're searching for. we can just fall
-    // through here.
   }
 
-  private void rowAtOrBeforeWithCandidates(final StoreFile f,
-    final KeyValue targetkey,
-    final NavigableSet<KeyValue> candidates,
-    final NavigableSet<KeyValue> deletes, final long now) 
+  /*
+   * Seek the file scanner to firstOnRow or first entry in file.
+   * @param scanner
+   * @param firstOnRow
+   * @param firstKV
+   * @return True if we successfully seeked scanner.
+   * @throws IOException
+   */
+  private boolean seekToScanner(final HFileScanner scanner,
+    final KeyValue firstOnRow, final KeyValue firstKV)
   throws IOException {
-    // if there are already candidate keys, we need to start our search 
-    // at the earliest possible key so that we can discover any possible
-    // deletes for keys between the start and the search key.  Back up to start
-    // of the row in case there are deletes for this candidate in this mapfile
-    // BUT do not backup before the first key in the store file.
-    KeyValue firstCandidateKey = candidates.first();
-    KeyValue search = null;
-    if (this.comparator.compareRows(firstCandidateKey, targetkey) < 0) {
-      search = targetkey;
-    } else {
-      search = firstCandidateKey;
-    }
+    KeyValue kv = firstOnRow;
+    // If firstOnRow < firstKV, set to firstKV
+    if (this.comparator.compareRows(firstKV, firstOnRow) == 0) kv = firstKV;
+    int result = scanner.seekTo(kv.getBuffer(), kv.getKeyOffset(),
+      kv.getKeyLength());
+    return result >= 0;
+  }
 
-    // Seek to the exact row, or the one that would be immediately before it
-    Reader r = f.getReader();
-    if (r == null) {
-      LOG.warn("StoreFile " + f + " has a null Reader");
-      return;
-    }
-    HFileScanner scanner = r.getScanner();
-    int result = scanner.seekTo(search.getBuffer(), search.getKeyOffset(),
-      search.getKeyLength());
-    if (result < 0) {
-      // Key is before start of this file.  Return.
-      return;
-    }
+  /*
+   * When we come in here, we are probably at the kv just before we break into
+   * the row that firstOnRow is on.  Usually need to increment one time to get
+   * on to the row we are interested in.
+   * @param scanner
+   * @param firstOnRow
+   * @param state
+   * @return True we found a candidate.
+   * @throws IOException
+   */
+  private boolean walkForwardInSingleRow(final HFileScanner scanner,
+    final KeyValue firstOnRow, final GetClosestRowBeforeTracker state)
+  throws IOException {
+    boolean foundCandidate = false;
     do {
       KeyValue kv = scanner.getKeyValue();
-      // if we have an exact match on row, and it's not a delete, save this
-      // as a candidate key
-      if (this.comparator.matchingRows(kv, targetkey)) {
-        handleKey(kv, now, deletes, candidates);
-      } else if (this.comparator.compareRows(kv, targetkey) > 0 ) {
-        // if the row key we just read is beyond the key we're searching for,
-        // then we're done.
+      // If we are not in the row, skip.
+      if (this.comparator.compareRows(kv, firstOnRow) < 0) continue;
+      // Did we go beyond the target row? If so break.
+      if (state.isTooFar(kv, firstOnRow)) break;
+      if (state.isExpired(kv)) {
+        continue;
+      }
+      // If we added something, this row is a contender. break.
+      if (state.handle(kv)) {
+        foundCandidate = true;
         break;
-      } else {
-        // So, the row key doesn't match, but we haven't gone past the row
-        // we're seeking yet, so this row is a candidate for closest 
-        // (assuming that it isn't a delete).
-        handleKey(kv, now, deletes, candidates);
       }
     } while(scanner.next());
-  }
-
-  /*
-   * Used calculating keys at or just before a passed key.
-   * @param readkey
-   * @param now
-   * @param deletes Set with Comparator that ignores key type.
-   * @param candidate Set with Comprator that ignores key type.
-   */
-  private void handleKey(final KeyValue readkey, final long now,
-      final NavigableSet<KeyValue> deletes,
-      final NavigableSet<KeyValue> candidates) {
-    if (!readkey.isDeleteType()) {
-      handleNonDelete(readkey, now, deletes, candidates);
-    } else {
-      handleDeletes(readkey, candidates, deletes);
-    }
-  }
-
-  /*
-   * Used calculating keys at or just before a passed key.
-   * @param readkey
-   * @param now
-   * @param deletes Set with Comparator that ignores key type.
-   * @param candidates Set with Comparator that ignores key type.
-   * @return True if we added a candidate.
-   */
-  private boolean handleNonDelete(final KeyValue readkey, final long now,
-      final NavigableSet<KeyValue> deletes,
-      final NavigableSet<KeyValue> candidates) {
-    if (notExpiredAndNotInDeletes(this.ttl, readkey, now, deletes)) {
-      candidates.add(readkey);
-      return true;
-    }
-    return false;
-  }
-
-  /**
-   * Handle keys whose values hold deletes.
-   * Add to the set of deletes and then if the candidate keys contain any that
-   * might match, then check for a match and remove it.  Implies candidates
-   * is made with a Comparator that ignores key type.
-   * @param k
-   * @param candidates
-   * @param deletes
-   * @return True if we removed <code>k</code> from <code>candidates</code>.
-   */
-  static boolean handleDeletes(final KeyValue k,
-      final NavigableSet<KeyValue> candidates,
-      final NavigableSet<KeyValue> deletes) {
-    deletes.add(k);
-    return candidates.remove(k);
+    return foundCandidate;
   }
 
   /**

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=805063&r1=805062&r2=805063&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Mon Aug 17 17:34:19 2009
@@ -145,7 +145,6 @@
    * @return true if there are more rows, false if scanner is done
    */
   public synchronized boolean next(List<KeyValue> outResult) throws IOException {
-    List<KeyValue> results = new ArrayList<KeyValue>();
     KeyValue peeked = this.heap.peek();
     if (peeked == null) {
       close();
@@ -153,6 +152,7 @@
     }
     matcher.setRow(peeked.getRow());
     KeyValue kv;
+    List<KeyValue> results = new ArrayList<KeyValue>();
     while((kv = this.heap.peek()) != null) {
       QueryMatcher.MatchCode qcode = matcher.match(kv);
       switch(qcode) {
@@ -162,7 +162,6 @@
           continue;
           
         case DONE:
-
           // copy jazz
           outResult.addAll(results);
           return true;
@@ -198,7 +197,6 @@
     if (!results.isEmpty()) {
       // copy jazz
       outResult.addAll(results);
-
       return true;
     }
 

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/Bytes.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/Bytes.java?rev=805063&r1=805062&r2=805063&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/Bytes.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/Bytes.java Mon Aug 17 17:34:19 2009
@@ -242,7 +242,7 @@
    * @return String made from <code>b</code>
    */
   public static String toString(final byte [] b) {
-    if(b == null) {
+    if (b == null) {
       return null;
     }
     return toString(b, 0, b.length);

Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseTestCase.java?rev=805063&r1=805062&r2=805063&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseTestCase.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseTestCase.java Mon Aug 17 17:34:19 2009
@@ -62,8 +62,7 @@
   protected final static byte [] fam1 = Bytes.toBytes("colfamily1");
   protected final static byte [] fam2 = Bytes.toBytes("colfamily2");
   protected final static byte [] fam3 = Bytes.toBytes("colfamily3");
-  protected static final byte [][] COLUMNS = {fam1,
-    fam2, fam3};
+  protected static final byte [][] COLUMNS = {fam1, fam2, fam3};
 
   private boolean localfs = false;
   protected Path testDir = null;

Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestCompaction.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestCompaction.java?rev=805063&r1=805062&r2=805063&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestCompaction.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestCompaction.java Mon Aug 17 17:34:19 2009
@@ -258,8 +258,8 @@
 
   private void createSmallerStoreFile(final HRegion region) throws IOException {
     HRegionIncommon loader = new HRegionIncommon(region); 
-    addContent(loader, Bytes.toString(COLUMN_FAMILY),
-        ("bbb").getBytes(), null);
+    addContent(loader, Bytes.toString(COLUMN_FAMILY), ("" +
+    		"bbb").getBytes(), null);
     loader.flushcache();
   }
 }

Added: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestGetClosestAtOrBefore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestGetClosestAtOrBefore.java?rev=805063&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestGetClosestAtOrBefore.java (added)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestGetClosestAtOrBefore.java Mon Aug 17 17:34:19 2009
@@ -0,0 +1,350 @@
+/**
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestCase;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.TestGet;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+/**
+ * {@link TestGet} is a medley of tests of get all done up as a single test.
+ * This class 
+ */
+public class TestGetClosestAtOrBefore extends HBaseTestCase implements HConstants {
+  static final Log LOG = LogFactory.getLog(TestGetClosestAtOrBefore.class);
+  private MiniDFSCluster miniHdfs;
+  
+  private static final byte [] T00 = Bytes.toBytes("000");
+  private static final byte [] T10 = Bytes.toBytes("010");
+  private static final byte [] T11 = Bytes.toBytes("011");
+  private static final byte [] T12 = Bytes.toBytes("012");
+  private static final byte [] T20 = Bytes.toBytes("020");
+  private static final byte [] T30 = Bytes.toBytes("030");
+  private static final byte [] T31 = Bytes.toBytes("031");
+  private static final byte [] T35 = Bytes.toBytes("035");
+  private static final byte [] T40 = Bytes.toBytes("040");
+
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+    this.miniHdfs = new MiniDFSCluster(this.conf, 1, true, null);
+    // Set the hbase.rootdir to be the home directory in mini dfs.
+    this.conf.set(HConstants.HBASE_DIR,
+      this.miniHdfs.getFileSystem().getHomeDirectory().toString());
+  }
+
+  public void testUsingMetaAndBinary() throws IOException {
+    FileSystem filesystem = FileSystem.get(conf);
+    Path rootdir = filesystem.makeQualified(new Path(conf.get(HConstants.HBASE_DIR)));
+    filesystem.mkdirs(rootdir);
+    // Up flush size else we bind up when we use default catalog flush of 16k.
+    HRegionInfo.FIRST_META_REGIONINFO.getTableDesc().
+      setMemStoreFlushSize(64 * 1024 * 1024);
+    HRegion mr = HRegion.createHRegion(HRegionInfo.FIRST_META_REGIONINFO,
+      rootdir, this.conf);
+    // Write rows for three tables 'A', 'B', and 'C'.
+    for (char c = 'A'; c < 'D'; c++) {
+      HTableDescriptor htd = new HTableDescriptor("" + c);
+      final int last = 128;
+      final int interval = 2;
+      for (int i = 0; i <= last; i += interval) {
+        HRegionInfo hri = new HRegionInfo(htd,
+          i == 0? HConstants.EMPTY_BYTE_ARRAY: Bytes.toBytes((byte)i),
+          i == last? HConstants.EMPTY_BYTE_ARRAY: Bytes.toBytes((byte)i + interval));
+        Put put = new Put(hri.getRegionName());
+        put.add(CATALOG_FAMILY, REGIONINFO_QUALIFIER, Writables.getBytes(hri));
+        mr.put(put, false);
+      }
+    }
+    InternalScanner s = mr.getScanner(new Scan());
+    try {
+      List<KeyValue> keys = new ArrayList<KeyValue>();
+      while(s.next(keys)) {
+        LOG.info(keys);
+        keys.clear();
+      }
+    } finally {
+      s.close();
+    }
+    findRow(mr, 'C', 44, 44);
+    findRow(mr, 'C', 45, 44);
+    findRow(mr, 'C', 46, 46);
+    findRow(mr, 'C', 43, 42);
+    mr.flushcache();
+    findRow(mr, 'C', 44, 44);
+    findRow(mr, 'C', 45, 44);
+    findRow(mr, 'C', 46, 46);
+    findRow(mr, 'C', 43, 42);
+    // Now delete 'C' and make sure I don't get entries from 'B'.
+    byte [] firstRowInC = HRegionInfo.createRegionName(Bytes.toBytes("" + 'C'),
+      HConstants.EMPTY_BYTE_ARRAY, HConstants.ZEROES);
+    Scan scan = new Scan(firstRowInC);
+    s = mr.getScanner(scan);
+    try {
+      List<KeyValue> keys = new ArrayList<KeyValue>();
+      while (s.next(keys)) {
+        mr.delete(new Delete(keys.get(0).getRow()), null, false);
+        keys.clear();
+      }
+    } finally {
+      s.close();
+    }
+    // Assert we get null back (pass -1).
+    findRow(mr, 'C', 44, -1);
+    findRow(mr, 'C', 45, -1);
+    findRow(mr, 'C', 46, -1);
+    findRow(mr, 'C', 43, -1);
+    mr.flushcache();
+    findRow(mr, 'C', 44, -1);
+    findRow(mr, 'C', 45, -1);
+    findRow(mr, 'C', 46, -1);
+    findRow(mr, 'C', 43, -1);
+  }
+
+  /*
+   * @param mr
+   * @param table
+   * @param rowToFind
+   * @param answer Pass -1 if we're not to find anything.
+   * @return Row found.
+   * @throws IOException
+   */
+  private byte [] findRow(final HRegion mr, final char table,
+    final int rowToFind, final int answer)
+  throws IOException {
+    byte [] tableb = Bytes.toBytes("" + table);
+    // Find the row.
+    byte [] tofindBytes = Bytes.toBytes((short)rowToFind);
+    byte [] metaKey = HRegionInfo.createRegionName(tableb, tofindBytes,
+      HConstants.NINES);
+    LOG.info("find=" + new String(metaKey));
+    Result r = mr.getClosestRowBefore(metaKey);
+    if (answer == -1) {
+      assertNull(r);
+      return null;
+    }
+    assertTrue(Bytes.compareTo(Bytes.toBytes((short)answer),
+      extractRowFromMetaRow(r.getRow())) == 0);
+    return r.getRow();
+  }
+
+  private byte [] extractRowFromMetaRow(final byte [] b) {
+    int firstDelimiter = KeyValue.getDelimiter(b, 0, b.length,
+      HRegionInfo.DELIMITER);
+    int lastDelimiter = KeyValue.getDelimiterInReverse(b, 0, b.length,
+      HRegionInfo.DELIMITER);
+    int length = lastDelimiter - firstDelimiter - 1;
+    byte [] row = new byte[length];
+    System.arraycopy(b, firstDelimiter + 1, row, 0, length);
+    return row;
+  }
+
+  /**
+   * Test file of multiple deletes and with deletes as final key.
+   * @see <a href="https://issues.apache.org/jira/browse/HBASE-751">HBASE-751</a>
+   */
+  public void testGetClosestRowBefore3() throws IOException{
+    HRegion region = null;
+    byte [] c0 = COLUMNS[0];
+    byte [] c1 = COLUMNS[1];
+    try {
+      HTableDescriptor htd = createTableDescriptor(getName());
+      region = createNewHRegion(htd, null, null);
+      
+      Put p = new Put(T00);
+      p.add(c0, c0, T00);
+      region.put(p);
+      
+      p = new Put(T10);
+      p.add(c0, c0, T10);
+      region.put(p);
+      
+      p = new Put(T20);
+      p.add(c0, c0, T20);
+      region.put(p);
+      
+      Result r = region.getClosestRowBefore(T20, c0);
+      assertTrue(Bytes.equals(T20, r.getRow()));
+      
+      Delete d = new Delete(T20);
+      d.deleteColumn(c0, c0);
+      region.delete(d, null, false);
+      
+      r = region.getClosestRowBefore(T20, c0);
+      assertTrue(Bytes.equals(T10, r.getRow()));
+      
+      p = new Put(T30);
+      p.add(c0, c0, T30);
+      region.put(p);
+      
+      r = region.getClosestRowBefore(T30, c0);
+      assertTrue(Bytes.equals(T30, r.getRow()));
+      
+      d = new Delete(T30);
+      d.deleteColumn(c0, c0);
+      region.delete(d, null, false);
+
+      r = region.getClosestRowBefore(T30, c0);
+      assertTrue(Bytes.equals(T10, r.getRow()));
+      r = region.getClosestRowBefore(T31, c0);
+      assertTrue(Bytes.equals(T10, r.getRow()));
+
+      region.flushcache();
+
+      // try finding "010" after flush
+      r = region.getClosestRowBefore(T30, c0);
+      assertTrue(Bytes.equals(T10, r.getRow()));
+      r = region.getClosestRowBefore(T31, c0);
+      assertTrue(Bytes.equals(T10, r.getRow()));
+      
+      // Put into a different column family.  Should make it so I still get t10
+      p = new Put(T20);
+      p.add(c1, c1, T20);
+      region.put(p);
+
+      r = region.getClosestRowBefore(T30, c0);
+      assertTrue(Bytes.equals(T10, r.getRow()));
+      r = region.getClosestRowBefore(T31, c0);
+      assertTrue(Bytes.equals(T10, r.getRow()));
+      
+      region.flushcache();
+      
+      r = region.getClosestRowBefore(T30, c0);
+      assertTrue(Bytes.equals(T10, r.getRow()));
+      r = region.getClosestRowBefore(T31, c0);
+      assertTrue(Bytes.equals(T10, r.getRow()));
+      
+      // Now try combo of memcache and mapfiles.  Delete the t20 COLUMS[1]
+      // in memory; make sure we get back t10 again.
+      d = new Delete(T20);
+      d.deleteColumn(c1, c1);
+      region.delete(d, null, false);
+      r = region.getClosestRowBefore(T30, c0);
+      assertTrue(Bytes.equals(T10, r.getRow()));
+      
+      // Ask for a value off the end of the file.  Should return t10.
+      r = region.getClosestRowBefore(T31, c0);
+      assertTrue(Bytes.equals(T10, r.getRow()));
+      region.flushcache();
+      r = region.getClosestRowBefore(T31, c0);
+      assertTrue(Bytes.equals(T10, r.getRow()));
+      
+      // Ok.  Let the candidate come out of hfile but have delete of
+      // the candidate be in memory.
+      p = new Put(T11);
+      p.add(c0, c0, T11);
+      region.put(p);
+      d = new Delete(T10);
+      d.deleteColumn(c1, c1);
+      r = region.getClosestRowBefore(T12, c0);
+      assertTrue(Bytes.equals(T11, r.getRow()));
+    } finally {
+      if (region != null) {
+        try {
+          region.close();
+        } catch (Exception e) {
+          e.printStackTrace();
+        }
+        region.getLog().closeAndDelete();
+      }
+    }
+  }
+
+  /** For HBASE-694 */
+  public void testGetClosestRowBefore2() throws IOException{
+    HRegion region = null;
+    byte [] c0 = COLUMNS[0];
+    try {
+      HTableDescriptor htd = createTableDescriptor(getName());
+      region = createNewHRegion(htd, null, null);
+      
+      Put p = new Put(T10);
+      p.add(c0, c0, T10);
+      region.put(p);
+      
+      p = new Put(T30);
+      p.add(c0, c0, T30);
+      region.put(p);
+      
+      p = new Put(T40);
+      p.add(c0, c0, T40);
+      region.put(p);
+
+      // try finding "035"
+      Result r = region.getClosestRowBefore(T35, c0);
+      assertTrue(Bytes.equals(T30, r.getRow()));
+
+      region.flushcache();
+
+      // try finding "035"
+      r = region.getClosestRowBefore(T35, c0);
+      assertTrue(Bytes.equals(T30, r.getRow()));
+
+      p = new Put(T20);
+      p.add(c0, c0, T20);
+      region.put(p);
+      
+      // try finding "035"
+      r = region.getClosestRowBefore(T35, c0);
+      assertTrue(Bytes.equals(T30, r.getRow()));
+      
+      region.flushcache();
+
+      // try finding "035"
+      r = region.getClosestRowBefore(T35, c0);
+      assertTrue(Bytes.equals(T30, r.getRow()));
+    } finally {
+      if (region != null) {
+        try {
+          region.close();
+        } catch (Exception e) {
+          e.printStackTrace();
+        }
+        region.getLog().closeAndDelete();
+      }
+    }
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+    if (this.miniHdfs != null) {
+      this.miniHdfs.shutdown();
+    }
+    super.tearDown();
+  }
+}
\ No newline at end of file

Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=805063&r1=805062&r2=805063&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java Mon Aug 17 17:34:19 2009
@@ -22,7 +22,6 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
@@ -35,7 +34,6 @@
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;

Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java?rev=805063&r1=805062&r2=805063&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java Mon Aug 17 17:34:19 2009
@@ -42,7 +42,7 @@
   private final Log LOG = LogFactory.getLog(this.getClass());
   private MemStore memstore;
   private static final int ROW_COUNT = 10;
-  private static final int QUALIFIER_COUNT = 10;
+  private static final int QUALIFIER_COUNT = ROW_COUNT;
   private static final byte [] FAMILY = Bytes.toBytes("column");
   private static final byte [] CONTENTS_BASIC = Bytes.toBytes("contents:basic");
   private static final String CONTENTSTR = "contentstr";
@@ -82,6 +82,8 @@
       while (s.next(result)) {
         LOG.info(result);
         count++;
+        // Row count is same as column count.
+        assertEquals(rowCount, result.size());
         result.clear();
       }
     } finally {
@@ -98,6 +100,8 @@
         // Assert the stuff is coming out in right order.
         assertTrue(Bytes.compareTo(Bytes.toBytes(count), result.get(0).getRow()) == 0);
         count++;
+        // Row count is same as column count.
+        assertEquals(rowCount, result.size());
         if (count == 2) {
           this.memstore.snapshot();
           LOG.info("Snapshotted");
@@ -108,6 +112,34 @@
       s.close();
     }
     assertEquals(rowCount, count);
+    // Assert that new values are seen in kvset as we scan.
+    long ts = System.currentTimeMillis();
+    s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP,
+      this.memstore.comparator, null, memstorescanners);
+    count = 0;
+    int snapshotIndex = 5;
+    try {
+      while (s.next(result)) {
+        LOG.info(result);
+        // Assert the stuff is coming out in right order.
+        assertTrue(Bytes.compareTo(Bytes.toBytes(count), result.get(0).getRow()) == 0);
+        // Row count is same as column count.
+        // TODO PUTBACK assertEquals("count=" + count + ", result=" + result,
+        //  rowCount, result.size());
+        count++;
+        if (count == snapshotIndex) {
+          this.memstore.snapshot();
+          this.memstore.clearSnapshot(this.memstore.getSnapshot());
+          // Added more rows into kvset.
+          addRows(this.memstore, ts);
+          LOG.info("Snapshotted, cleared it and then added values");
+        }
+        result.clear();
+      }
+    } finally {
+      s.close();
+    }
+    assertEquals(rowCount, count);
   }
 
   /** 
@@ -126,7 +158,7 @@
   }
 
   public void testMultipleVersionsSimple() throws Exception {
-    MemStore m = new MemStore(HConstants.FOREVER, KeyValue.COMPARATOR);
+    MemStore m = new MemStore(KeyValue.COMPARATOR);
     byte [] row = Bytes.toBytes("testRow");
     byte [] family = Bytes.toBytes("testFamily");
     byte [] qf = Bytes.toBytes("testQualifier");
@@ -146,7 +178,7 @@
   }
 
   public void testBinary() throws IOException {
-    MemStore mc = new MemStore(HConstants.FOREVER, KeyValue.ROOT_COMPARATOR);
+    MemStore mc = new MemStore(KeyValue.ROOT_COMPARATOR);
     final int start = 43;
     final int end = 46;
     for (int k = start; k <= end; k++) {
@@ -180,66 +212,7 @@
   //////////////////////////////////////////////////////////////////////////////
   // Get tests
   //////////////////////////////////////////////////////////////////////////////
-  /** For HBASE-528 */
-  public void testGetRowKeyAtOrBefore() {
-    // set up some test data
-    byte [] t10 = Bytes.toBytes("010");
-    byte [] t20 = Bytes.toBytes("020");
-    byte [] t30 = Bytes.toBytes("030");
-    byte [] t35 = Bytes.toBytes("035");
-    byte [] t40 = Bytes.toBytes("040");
-    
-    memstore.add(getKV(t10, "t10 bytes".getBytes()));
-    memstore.add(getKV(t20, "t20 bytes".getBytes()));
-    memstore.add(getKV(t30, "t30 bytes".getBytes()));
-    memstore.add(getKV(t35, "t35 bytes".getBytes()));
-    // write a delete in there to see if things still work ok
-    memstore.add(getDeleteKV(t35));
-    memstore.add(getKV(t40, "t40 bytes".getBytes()));
-    
-    NavigableSet<KeyValue> results = null;
-    
-    // try finding "015"
-    results =
-      new TreeSet<KeyValue>(this.memstore.comparator.getComparatorIgnoringType());
-    KeyValue t15 = new KeyValue(Bytes.toBytes("015"),
-      System.currentTimeMillis());
-    memstore.getRowKeyAtOrBefore(t15, results);
-    KeyValue kv = results.last();
-    assertTrue(KeyValue.COMPARATOR.compareRows(kv, t10) == 0);
-
-    // try "020", we should get that row exactly
-    results =
-      new TreeSet<KeyValue>(this.memstore.comparator.getComparatorIgnoringType());
-    memstore.getRowKeyAtOrBefore(new KeyValue(t20, System.currentTimeMillis()),
-      results);
-    assertTrue(KeyValue.COMPARATOR.compareRows(results.last(), t20) == 0);
-
-    // try "030", we should get that row exactly
-    results =
-      new TreeSet<KeyValue>(this.memstore.comparator.getComparatorIgnoringType());
-    memstore.getRowKeyAtOrBefore(new KeyValue(t30, System.currentTimeMillis()),
-      results);
-    assertTrue(KeyValue.COMPARATOR.compareRows(results.last(), t30) == 0);
-  
-    // try "038", should skip the deleted "035" and give "030"
-    results =
-      new TreeSet<KeyValue>(this.memstore.comparator.getComparatorIgnoringType());
-    byte [] t38 = Bytes.toBytes("038");
-    memstore.getRowKeyAtOrBefore(new KeyValue(t38, System.currentTimeMillis()),
-      results);
-    assertTrue(KeyValue.COMPARATOR.compareRows(results.last(), t30) == 0);
-  
-    // try "050", should get stuff from "040"
-    results =
-      new TreeSet<KeyValue>(this.memstore.comparator.getComparatorIgnoringType());
-    byte [] t50 = Bytes.toBytes("050");
-    memstore.getRowKeyAtOrBefore(new KeyValue(t50, System.currentTimeMillis()),
-      results);
-    assertTrue(KeyValue.COMPARATOR.compareRows(results.last(), t40) == 0);
-  }
-  
-  
+
   /** Test getNextRow from memstore
    * @throws InterruptedException 
    */
@@ -606,8 +579,19 @@
    * @throws IOException 
    */
   private int addRows(final MemStore hmc) {
+    return addRows(hmc, HConstants.LATEST_TIMESTAMP);
+  }
+  
+  /**
+   * Adds {@link #ROW_COUNT} rows and {@link #COLUMNS_COUNT}
+   * @param hmc Instance to add rows to.
+   * @return How many rows we added.
+   * @throws IOException 
+   */
+  private int addRows(final MemStore hmc, final long ts) {
     for (int i = 0; i < ROW_COUNT; i++) {
-      long timestamp = System.currentTimeMillis();
+      long timestamp = ts == HConstants.LATEST_TIMESTAMP?
+        System.currentTimeMillis(): ts;
       for (int ii = 0; ii < QUALIFIER_COUNT; ii++) {
         byte [] row = Bytes.toBytes(i);
         byte [] qf = makeQualifier(i, ii);

Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java?rev=805063&r1=805062&r2=805063&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java Mon Aug 17 17:34:19 2009
@@ -406,5 +406,4 @@
     long storeTs = results.get(0).getTimestamp();
     assertTrue(icvTs != storeTs);
   }
-  
-}
+}
\ No newline at end of file