You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2008/08/01 22:59:52 UTC

svn commit: r681822 - in /hadoop/hbase/trunk/src: java/org/apache/hadoop/hbase/ java/org/apache/hadoop/hbase/regionserver/ test/org/apache/hadoop/hbase/regionserver/

Author: stack
Date: Fri Aug  1 13:59:52 2008
New Revision: 681822

URL: http://svn.apache.org/viewvc?rev=681822&view=rev
Log:
Patches to alleviate HBASE-790 and HBASE-792

Modified:
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HStoreKey.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestGet2.java

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HStoreKey.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HStoreKey.java?rev=681822&r1=681821&r2=681822&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HStoreKey.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HStoreKey.java Fri Aug  1 13:59:52 2008
@@ -263,8 +263,8 @@
   /** {@inheritDoc} */
   @Override
   public int hashCode() {
-    int result = this.row.hashCode();
-    result ^= this.column.hashCode();
+    int result = Bytes.hashCode(this.row);
+    result ^= Bytes.hashCode(this.column);
     result ^= this.timestamp;
     return result;
   }

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=681822&r1=681821&r2=681822&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Fri Aug  1 13:59:52 2008
@@ -317,7 +317,7 @@
     new ConcurrentHashMap<Integer, byte []>();
   private final Map<Integer, TreeMap<HStoreKey, byte []>> targetColumns =
       new ConcurrentHashMap<Integer, TreeMap<HStoreKey, byte []>>();
-  private volatile boolean flushRequested;
+  private volatile boolean flushRequested = false;
   // Default access because read by tests.
   final Map<Integer, HStore> stores = new ConcurrentHashMap<Integer, HStore>();
   final AtomicLong memcacheSize = new AtomicLong(0);
@@ -439,7 +439,6 @@
     this.conf = conf;
     this.regionInfo = regionInfo;
     this.flushListener = flushListener;
-    this.flushRequested = false;
     this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
     String encodedNameStr = Integer.toString(this.regionInfo.getEncodedName());
     this.regiondir = new Path(basedir, encodedNameStr);
@@ -1193,10 +1192,9 @@
             storeSet.add(store);
           }
         }
-      }
-      else
+      } else {
         storeSet.addAll(stores.values());
-      
+      }
       // For each column name that is just a column family, open the store
       // related to it and fetch everything for that row. HBASE-631
       // Also remove each store from storeSet so that these stores
@@ -1427,8 +1425,8 @@
    */
   private synchronized void checkResources() {
     boolean blocked = false;
-
-    while (this.memcacheSize.get() >= this.blockingMemcacheSize) {
+    while (this.memcacheSize.get() > this.blockingMemcacheSize) {
+      requestFlush();
       if (!blocked) {
         LOG.info("Blocking updates for '" + Thread.currentThread().getName() +
             "' on region " + Bytes.toString(getRegionName()) + ": Memcache size " +
@@ -1436,7 +1434,6 @@
             " is >= than blocking " +
             StringUtils.humanReadableInt(this.blockingMemcacheSize) + " size");
       }
-
       blocked = true;
       try {
         wait(threadWakeFrequency);
@@ -1610,16 +1607,34 @@
             getStore(key.getColumn()).add(key, e.getValue()));
       }
       flush = this.flushListener != null && !this.flushRequested &&
-        size > this.memcacheFlushSize;
+        isFlushSize(size);
     } finally {
       this.updatesLock.readLock().unlock();
     }
     if (flush) {
       // Request a cache flush.  Do it outside update lock.
-      this.flushListener.request(this);
-      this.flushRequested = true;
+      requestFlush();
     }
   }
+
+  private void requestFlush() {
+    if (this.flushListener == null || this.flushRequested) {
+      return;
+    }
+    this.flushListener.request(this);
+    this.flushRequested = true;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Flush requested on " + this);
+    }
+  }
+
+  /*
+   * @param size
+   * @return True if size is over the flush threshold
+   */
+  private boolean isFlushSize(final long size) {
+    return size > this.memcacheFlushSize;
+  }
   
   // Do any reconstruction needed from the log
   @SuppressWarnings("unused")

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=681822&r1=681821&r2=681822&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java Fri Aug  1 13:59:52 2008
@@ -608,16 +608,10 @@
           HStoreKey curkey = es.getKey();
           byte[] bytes = es.getValue();
           if (HStoreKey.matchingFamily(this.family.getName(), curkey.getColumn())) {
-            if (ttl == HConstants.FOREVER ||
-                  now < curkey.getTimestamp() + ttl) {
+            if (!isExpired(curkey, ttl, now)) {
               entries++;
               out.append(curkey, new ImmutableBytesWritable(bytes));
               flushed += curkey.getSize() + (bytes == null ? 0 : bytes.length);
-            } else {
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("internalFlushCache: " + curkey +
-                  ": expired, skipped");
-              }
             }
           }
         }
@@ -930,12 +924,8 @@
           if (sk.getRow().length != 0 && sk.getColumn().length != 0) {
             // Only write out objects which have a non-zero length key and
             // value
-            if (ttl == HConstants.FOREVER || now < sk.getTimestamp() + ttl) {
+            if (!isExpired(sk, ttl, now)) {
               compactedOut.append(sk, vals[smallestKey]);
-            } else {
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("compactHStoreFiles: " + sk + ": expired, deleted");
-              }
             }
           }
         }
@@ -1197,17 +1187,12 @@
             // there aren't any pending deletes.
             if (!(deletes.containsKey(readcol) &&
                 deletes.get(readcol).longValue() >= readkey.getTimestamp())) {
-              if (ttl == HConstants.FOREVER ||
-                      now < readkey.getTimestamp() + ttl) {
+              if (!isExpired(readkey, ttl, now)) {
                 results.put(readcol, 
                   new Cell(readval.get(), readkey.getTimestamp()));
                 // need to reinstantiate the readval so we can reuse it, 
                 // otherwise next iteration will destroy our result
                 readval = new ImmutableBytesWritable();
-              } else {
-                if (LOG.isDebugEnabled()) {
-                  LOG.debug("getFullFromMapFile: " + readkey + ": expired, skipped");
-                }
               }
             }
           }
@@ -1259,7 +1244,7 @@
       // the old non-delete cell value in a later store file. If we don't keep
       // around the fact that the cell was deleted in a newer record, we end up
       // returning the old value if user is asking for more than one version.
-      // This List of deletes should not large since we are only keeping rows
+      // This List of deletes should not be large since we are only keeping rows
       // and columns that match those set on the scanner and which have delete
       // values.  If memory usage becomes an issue, could redo as bloom filter.
       Map<byte [], List<Long>> deletes =
@@ -1283,13 +1268,8 @@
             continue;
           }
           if (!isDeleted(readkey, readval.get(), true, deletes)) {
-            if (ttl == HConstants.FOREVER ||
-                    now < readkey.getTimestamp() + ttl) {
+            if (!isExpired(readkey, ttl, now)) {
               results.add(new Cell(readval.get(), readkey.getTimestamp()));
-            } else {
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("get: " + readkey + ": expired, skipped");
-              }
             }
             // Perhaps only one version is wanted.  I could let this
             // test happen later in the for loop test but it would cost
@@ -1304,13 +1284,8 @@
               !hasEnoughVersions(numVersions, results);
               readval = new ImmutableBytesWritable()) {
             if (!isDeleted(readkey, readval.get(), true, deletes)) {
-              if (ttl == HConstants.FOREVER ||
-                    now < readkey.getTimestamp() + ttl) {
+              if (!isExpired(readkey, ttl, now)) {
                 results.add(new Cell(readval.get(), readkey.getTimestamp()));
-              } else {
-                if (LOG.isDebugEnabled()) {
-                  LOG.debug("get: " + readkey + ": expired, skipped");
-                }
               }
             }
           }
@@ -1393,14 +1368,8 @@
                 // in the memcache
                 if (!isDeleted(readkey, readval.get(), false, null) &&
                     !keys.contains(readkey)) {
-                  if (ttl == HConstants.FOREVER ||
-                        now < readkey.getTimestamp() + ttl) {
+                  if (!isExpired(readkey, ttl, now)) {
                     keys.add(new HStoreKey(readkey));
-                  } else {
-                    if (LOG.isDebugEnabled()) {
-                      LOG.debug("getKeys: " + readkey +
-                          ": expired, skipped");
-                    }
                   }
 
                   // if we've collected enough versions, then exit the loop.
@@ -1439,24 +1408,34 @@
   byte [] getRowKeyAtOrBefore(final byte [] row)
   throws IOException{
     // Map of HStoreKeys that are candidates for holding the row key that
-    // most closely matches what we're looking for. We'll have to update it 
-    // deletes found all over the place as we go along before finally reading
-    // the best key out of it at the end.
+    // most closely matches what we're looking for. We'll have to update it as
+    // deletes are found all over the place as we go along before finally
+    // reading the best key out of it at the end.
     SortedMap<HStoreKey, Long> candidateKeys = new TreeMap<HStoreKey, Long>();
-    // Obtain read lock
+    
+    // Keep a list of deleted cell keys.  We need this because as we go through
+    // the store files, the cell with the delete marker may be in one file and
+    // the old non-delete cell value in a later store file. If we don't keep
+    // around the fact that the cell was deleted in a newer record, we end up
+    // returning the old value if user is asking for more than one version.
+    // This List of deletes should not be large since we are only keeping rows
+    // and columns that match those set on the scanner and which have delete
+    // values.  If memory usage becomes an issue, could redo as bloom filter.
+    Set<HStoreKey> deletes = new HashSet<HStoreKey>();
+    
+    
     this.lock.readLock().lock();
     try {
-      // Process each store file.  Run through from oldest to newest so deletes
-      // have chance to overshadow deleted cells
+      // First go to the memcache.  Pick up deletes and candidates.
+      this.memcache.getRowKeyAtOrBefore(row, candidateKeys, deletes);
+      
+      // Process each store file.  Run through from newest to oldest.
+      // This code below is very close to the body of the getKeys method.
       MapFile.Reader[] maparray = getReaders();
-      for (int i = 0; i < maparray.length; i++) {
+      for(int i = maparray.length - 1; i >= 0; i--) {
         // Update the candidate keys from the current map file
-        rowAtOrBeforeFromMapFile(maparray[i], row, candidateKeys);
+        rowAtOrBeforeFromMapFile(maparray[i], row, candidateKeys, deletes);
       }
-      
-      // Finally, check the memcache
-      this.memcache.getRowKeyAtOrBefore(row, candidateKeys);
-      
       // Return the best key from candidateKeys
       return candidateKeys.isEmpty()? null: candidateKeys.lastKey().getRow();
     } finally {
@@ -1472,8 +1451,9 @@
    * @param candidateKeys
    * @throws IOException
    */
-  private void rowAtOrBeforeFromMapFile(MapFile.Reader map, final byte [] row, 
-    SortedMap<HStoreKey, Long> candidateKeys)
+  private void rowAtOrBeforeFromMapFile(final MapFile.Reader map,
+    final byte [] row, final SortedMap<HStoreKey, Long> candidateKeys,
+    final Set<HStoreKey> deletes)
   throws IOException {
     HStoreKey startKey = new HStoreKey();
     ImmutableBytesWritable startValue = new ImmutableBytesWritable();
@@ -1491,9 +1471,9 @@
       long now = System.currentTimeMillis();
       // if there aren't any candidate keys yet, we'll do some things different 
       if (candidateKeys.isEmpty()) {
-        rowAtOrBeforeCandidate(startKey, map, row, candidateKeys, now);
+        rowAtOrBeforeCandidate(startKey, map, row, candidateKeys, deletes, now);
       } else {
-        rowAtOrBeforeWithCandidates(startKey, map, row, candidateKeys,
+        rowAtOrBeforeWithCandidates(startKey, map, row, candidateKeys, deletes,
           now);
       }
     }
@@ -1510,7 +1490,8 @@
    */
   private void rowAtOrBeforeCandidate(final HStoreKey startKey,
     final MapFile.Reader map, final byte[] row,
-    final SortedMap<HStoreKey, Long> candidateKeys, final long now) 
+    final SortedMap<HStoreKey, Long> candidateKeys,
+    final Set<HStoreKey> deletes, final long now) 
   throws IOException {
     // if the row we're looking for is past the end of this mapfile, set the
     // search key to be the last key.  If its a deleted key, then we'll back
@@ -1525,9 +1506,31 @@
         searchKey = startKey;
       }
     }
-    rowAtOrBeforeCandidate(map, searchKey, candidateKeys, now);
+    rowAtOrBeforeCandidate(map, searchKey, candidateKeys, deletes, now);
   }
-  
+
+  /* 
+   * @param ttlSetting
+   * @param hsk
+   * @param now
+   * @param deletes
+   * @return True if key has not expired and is not in passed set of deletes.
+   */
+  static boolean notExpiredAndNotInDeletes(final long ttl,
+      final HStoreKey hsk, final long now, final Set<HStoreKey> deletes) {
+    return !isExpired(hsk, ttl, now) && !deletes.contains(hsk);
+  }
+  
+  private static boolean isExpired(final HStoreKey hsk, final long ttl,
+      final long now) {
+    boolean result = ttl != HConstants.FOREVER && now > hsk.getTimestamp() + ttl;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("rowAtOrBeforeCandidate 1:" + hsk +
+        ": expired, skipped");
+    }
+    return result;
+  }
+
   /* Find a candidate for row that is at or before passed key, sk, in mapfile.
    * @param map
    * @param sk Key to go search the mapfile with.
@@ -1538,7 +1541,7 @@
    */
   private void rowAtOrBeforeCandidate(final MapFile.Reader map,
     final HStoreKey sk, final SortedMap<HStoreKey, Long> candidateKeys,
-    final long now)
+    final Set<HStoreKey> deletes, final long now)
   throws IOException {
     HStoreKey searchKey = sk;
     HStoreKey readkey = new HStoreKey();
@@ -1557,19 +1560,16 @@
         // as a candidate key
         if (Bytes.equals(readkey.getRow(), searchKey.getRow())) {
           if (!HLogEdit.isDeleted(readval.get())) {
-            if (ttl == HConstants.FOREVER ||
-                now < readkey.getTimestamp() + ttl) {
+            if (notExpiredAndNotInDeletes(this.ttl, readkey, now, deletes)) {
               candidateKeys.put(stripTimestamp(readkey), 
                   new Long(readkey.getTimestamp()));
               foundCandidate = true;
+              // NOTE! Continue.
               continue;
             }
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("rowAtOrBeforeCandidate 1:" + readkey +
-                ": expired, skipped");
-            }
           }
           // Deleted value.
+          deletes.add(readkey);
           if (deletedOrExpiredRow == null) {
             deletedOrExpiredRow = new HStoreKey(readkey);
           }
@@ -1582,18 +1582,14 @@
           // we're seeking yet, so this row is a candidate for closest
           // (assuming that it isn't a delete).
           if (!HLogEdit.isDeleted(readval.get())) {
-            if (ttl == HConstants.FOREVER || 
-                now < readkey.getTimestamp() + ttl) {
+            if (notExpiredAndNotInDeletes(this.ttl, readkey, now, deletes)) {
               candidateKeys.put(stripTimestamp(readkey), 
                   new Long(readkey.getTimestamp()));
               foundCandidate = true;
               continue;
             }
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("rowAtOrBeforeCandidate 2:" + readkey +
-                ": expired, skipped");
-            }
           }
+          deletes.add(readkey);
           if (deletedOrExpiredRow == null) {
             deletedOrExpiredRow = new HStoreKey(readkey);
           }
@@ -1619,7 +1615,8 @@
   
   private void rowAtOrBeforeWithCandidates(final HStoreKey startKey,
     final MapFile.Reader map, final byte[] row,
-    final SortedMap<HStoreKey, Long> candidateKeys, final long now) 
+    final SortedMap<HStoreKey, Long> candidateKeys,
+    final Set<HStoreKey> deletes, final long now) 
   throws IOException {
     HStoreKey readkey = new HStoreKey();
     ImmutableBytesWritable readval = new ImmutableBytesWritable();
@@ -1650,15 +1647,9 @@
       if (Bytes.equals(readkey.getRow(), row)) {
         strippedKey = stripTimestamp(readkey);
         if (!HLogEdit.isDeleted(readval.get())) {
-          if (ttl == HConstants.FOREVER || 
-              now < readkey.getTimestamp() + ttl) {
+          if (notExpiredAndNotInDeletes(this.ttl, readkey, now, deletes)) {
             candidateKeys.put(strippedKey,
                 new Long(readkey.getTimestamp()));
-          } else {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("rowAtOrBeforeWithCandidates 1: " + readkey +
-                  ": expired, skipped");
-            }
           }
         } else {
           // If the candidate keys contain any that might match by timestamp,
@@ -1682,14 +1673,8 @@
         // we're seeking yet, so this row is a candidate for closest 
         // (assuming that it isn't a delete).
         if (!HLogEdit.isDeleted(readval.get())) {
-          if (ttl == HConstants.FOREVER || 
-              now < readkey.getTimestamp() + ttl) {
+          if (notExpiredAndNotInDeletes(this.ttl, readkey, now, deletes)) {
             candidateKeys.put(strippedKey, Long.valueOf(readkey.getTimestamp()));
-          } else {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("rowAtOrBeforeWithCandidates 2: " + readkey +
-                  ": expired, skipped");
-            }
           }
         } else {
           // If the candidate keys contain any that might match by timestamp,
@@ -1902,4 +1887,4 @@
       return key;
     }
   }
-}
+}
\ No newline at end of file

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java?rev=681822&r1=681821&r2=681822&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java Fri Aug  1 13:59:52 2008
@@ -24,6 +24,7 @@
 import java.rmi.UnexpectedException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -327,24 +328,38 @@
    * @param row Row to look for.
    * @param candidateKeys Map of candidate keys (Accumulation over lots of
    * lookup over stores and memcaches)
+   * @param deletes Deletes collected so far.
    */
-  void getRowKeyAtOrBefore(final byte [] row, 
-    SortedMap<HStoreKey, Long> candidateKeys) {
+  void getRowKeyAtOrBefore(final byte [] row,
+      final SortedMap<HStoreKey, Long> candidateKeys) {
+    getRowKeyAtOrBefore(row, candidateKeys, new HashSet<HStoreKey>());
+  }
+  
+  /**
+   * @param row Row to look for.
+   * @param candidateKeys Map of candidate keys (Accumulation over lots of
+   * lookup over stores and memcaches)
+   * @param deletes Deletes collected so far.
+   */
+  void getRowKeyAtOrBefore(final byte [] row,
+      final SortedMap<HStoreKey, Long> candidateKeys, 
+      final Set<HStoreKey> deletes) {
     this.lock.readLock().lock();
     try {
       synchronized (memcache) {
-        internalGetRowKeyAtOrBefore(memcache, row, candidateKeys);
+        getRowKeyAtOrBefore(memcache, row, candidateKeys, deletes);
       }
       synchronized (snapshot) {
-        internalGetRowKeyAtOrBefore(snapshot, row, candidateKeys);
+        getRowKeyAtOrBefore(snapshot, row, candidateKeys, deletes);
       }
     } finally {
       this.lock.readLock().unlock();
     }
   }
 
-  private void internalGetRowKeyAtOrBefore(SortedMap<HStoreKey, byte []> map,
-      byte [] row, SortedMap<HStoreKey, Long> candidateKeys) {
+  private void getRowKeyAtOrBefore(final SortedMap<HStoreKey, byte []> map,
+      final byte [] row, final SortedMap<HStoreKey, Long> candidateKeys,
+      final Set<HStoreKey> deletes) {
     // We want the earliest possible to start searching from.  Start before
     // the candidate key in case it turns out a delete came in later.
     HStoreKey search_key = candidateKeys.isEmpty()? new HStoreKey(row):
@@ -371,13 +386,12 @@
         found_key = key_iterator.next();
         if (Bytes.compareTo(found_key.getRow(), row) <= 0) {
           if (HLogEdit.isDeleted(tailMap.get(found_key))) {
-            handleDeleted(found_key, candidateKeys);
+            handleDeleted(found_key, candidateKeys, deletes);
             if (deletedOrExpiredRow == null) {
               deletedOrExpiredRow = found_key;
             }
           } else {
-            if (ttl == HConstants.FOREVER ||
-                now < found_key.getTimestamp() + ttl) {
+            if (HStore.notExpiredAndNotInDeletes(this.ttl, found_key, now, deletes)) {
               HStoreKey strippedKey = stripTimestamp(found_key);
               candidateKeys.put(strippedKey,
                 new Long(found_key.getTimestamp()));
@@ -395,12 +409,13 @@
         }
       }
       if (candidateKeys.isEmpty() && deletedOrExpiredRow != null) {
-        getRowKeyBefore(map, deletedOrExpiredRow, candidateKeys, victims, now);
+        getRowKeyBefore(map, deletedOrExpiredRow, candidateKeys, victims,
+          deletes, now);
       }
     } else {
       // The tail didn't contain any keys that matched our criteria, or was 
       // empty. Examine all the keys that proceed our splitting point.
-      getRowKeyBefore(map, search_key, candidateKeys, victims, now);
+      getRowKeyBefore(map, search_key, candidateKeys, victims, deletes, now);
     }
     // Remove expired victims from the map.
     for (HStoreKey victim: victims) {
@@ -419,7 +434,8 @@
    */
   private void getRowKeyBefore(SortedMap<HStoreKey, byte []> map,
       HStoreKey search_key, SortedMap<HStoreKey, Long> candidateKeys,
-      List<HStoreKey> victims, final long now) {
+      final List<HStoreKey> expires, final Set<HStoreKey> deletes,
+      final long now) {
     SortedMap<HStoreKey, byte []> headMap = map.headMap(search_key);
     // If we tried to create a headMap and got an empty map, then there are
     // no keys at or before the search key, so we're done.
@@ -429,35 +445,36 @@
     
     // If there aren't any candidate keys at this point, we need to search
     // backwards until we find at least one candidate or run out of headMap.
-    HStoreKey found_key = null;
     if (candidateKeys.isEmpty()) {
       Set<HStoreKey> keys = headMap.keySet();
       HStoreKey [] cells = keys.toArray(new HStoreKey[keys.size()]);
       byte [] lastRowFound = null;
       for (int i = cells.length - 1; i >= 0; i--) {
-        HStoreKey thisKey = cells[i];
+        HStoreKey found_key = cells[i];
         // if the last row we found a candidate key for is different than
         // the row of the current candidate, we can stop looking -- if its
         // not a delete record.
-        boolean deleted = HLogEdit.isDeleted(headMap.get(thisKey));
+        boolean deleted = HLogEdit.isDeleted(headMap.get(found_key));
         if (lastRowFound != null &&
-            !Bytes.equals(lastRowFound, thisKey.getRow()) && !deleted) {
+            !Bytes.equals(lastRowFound, found_key.getRow()) && !deleted) {
           break;
         }
         // If this isn't a delete, record it as a candidate key. Also 
         // take note of the row of this candidate so that we'll know when
         // we cross the row boundary into the previous row.
         if (!deleted) {
-          if (ttl == HConstants.FOREVER || now < thisKey.getTimestamp() + ttl) {
-            lastRowFound = thisKey.getRow();
-            candidateKeys.put(stripTimestamp(thisKey), 
-              new Long(thisKey.getTimestamp()));
+          if (HStore.notExpiredAndNotInDeletes(this.ttl, found_key, now, deletes)) {
+            lastRowFound = found_key.getRow();
+            candidateKeys.put(stripTimestamp(found_key), 
+              new Long(found_key.getTimestamp()));
           } else {
-            victims.add(found_key);
+            expires.add(found_key);
             if (LOG.isDebugEnabled()) {
               LOG.debug("getRowKeyBefore: " + found_key + ": expired, skipped");
             }
           }
+        } else {
+          deletes.add(found_key);
         }
       }
     } else {
@@ -469,16 +486,17 @@
         headMap.tailMap(new HStoreKey(headMap.lastKey().getRow()));
       Iterator<HStoreKey> key_iterator = thisRowTailMap.keySet().iterator();
       do {
-        found_key = key_iterator.next();
+        HStoreKey found_key = key_iterator.next();
         if (HLogEdit.isDeleted(thisRowTailMap.get(found_key))) {
-          handleDeleted(found_key, candidateKeys);
+          handleDeleted(found_key, candidateKeys, deletes);
         } else {
           if (ttl == HConstants.FOREVER ||
-              now < found_key.getTimestamp() + ttl) {
+              now < found_key.getTimestamp() + ttl ||
+              !deletes.contains(found_key)) {
             candidateKeys.put(stripTimestamp(found_key), 
               Long.valueOf(found_key.getTimestamp()));
           } else {
-            victims.add(found_key);
+            expires.add(found_key);
             if (LOG.isDebugEnabled()) {
               LOG.debug("internalGetRowKeyAtOrBefore: " + found_key +
                 ": expired, skipped");
@@ -490,7 +508,9 @@
   }
 
   private void handleDeleted(final HStoreKey k,
-      final SortedMap<HStoreKey, Long> candidateKeys) {
+      final SortedMap<HStoreKey, Long> candidateKeys,
+      final Set<HStoreKey> deletes) {
+    deletes.add(k);
     HStoreKey strippedKey = stripTimestamp(k);
     if (candidateKeys.containsKey(strippedKey)) {
       long bestCandidateTs = 

Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestGet2.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestGet2.java?rev=681822&r1=681821&r2=681822&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestGet2.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestGet2.java Fri Aug  1 13:59:52 2008
@@ -91,7 +91,7 @@
       batchUpdate.delete(COLUMNS[0]);
       region.batchUpdate(batchUpdate);
       
-      results = region.getClosestRowBefore(Bytes.toBytes(T10));
+      results = region.getClosestRowBefore(Bytes.toBytes(T20));
       assertEquals(T10, new String(results.get(COLUMNS[0]).getValue()));
       
       batchUpdate = new BatchUpdate(T30);