You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2007/09/10 17:56:18 UTC

svn commit: r574287 [1/2] - in /lucene/hadoop/trunk/src/contrib/hbase: ./ src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/io/ src/test/org/apache/hadoop/hbase/

Author: stack
Date: Mon Sep 10 08:56:16 2007
New Revision: 574287

URL: http://svn.apache.org/viewvc?rev=574287&view=rev
Log:
HADOOP-1784 Delete: Fix scanners and gets so they work properly in presence
of deletes. Added a deleteAll to remove all cells equal to or older than
passed timestamp.  Fixed compaction so deleted cells do not make it out
into compacted output.  Ensure also that versions > column max are dropped
compacting.

Modified:
    lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HAbstractScanner.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HInternalScannerInterface.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HScannerInterface.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchOperation.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MultiRegionTable.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompaction.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestMasterAdmin.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTimestamp.java

Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?rev=574287&r1=574286&r2=574287&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Mon Sep 10 08:56:16 2007
@@ -8,6 +8,11 @@
   NEW FEATURES
     HADOOP-1768 FS command using Hadoop FsShell operations
                 (Edward Yoon via Stack)
+    HADOOP-1784 Delete: Fix scanners and gets so they work properly in presence
+                of deletes. Added a deleteAll to remove all cells equal to or
+                older than passed timestamp.  Fixed compaction so deleted cells
+                do not make it out into compacted output.  Ensure also that
+                versions > column max are dropped compacting.
 
   OPTIMIZATIONS
 

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HAbstractScanner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HAbstractScanner.java?rev=574287&r1=574286&r2=574287&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HAbstractScanner.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HAbstractScanner.java Mon Sep 10 08:56:16 2007
@@ -20,6 +20,7 @@
 package org.apache.hadoop.hbase;
 
 import java.io.IOException;
+import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.Vector;
 import java.util.regex.Pattern;
@@ -205,7 +206,7 @@
    * 
    * @see org.apache.hadoop.hbase.HScannerInterface#next(org.apache.hadoop.hbase.HStoreKey, java.util.SortedMap)
    */
-  public boolean next(HStoreKey key, TreeMap<Text, byte []> results)
+  public boolean next(HStoreKey key, SortedMap<Text, byte []> results)
   throws IOException {
     // Find the next row label (and timestamp)
     Text chosenRow = null;
@@ -218,7 +219,6 @@
               || (keys[i].getRow().compareTo(chosenRow) < 0)
               || ((keys[i].getRow().compareTo(chosenRow) == 0)
                   && (keys[i].getTimestamp() > chosenTimestamp)))) {
-
         chosenRow = new Text(keys[i].getRow());
         chosenTimestamp = keys[i].getTimestamp();
       }

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java?rev=574287&r1=574286&r2=574287&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java Mon Sep 10 08:56:16 2007
@@ -103,7 +103,7 @@
   // be the first to be reassigned if the server(s) they are being served by
   // should go down.
 
-  /** The root table's name. */
+  /** The root table's name.*/
   static final Text ROOT_TABLE_NAME = new Text("-ROOT-");
 
   /** The META table's name. */
@@ -139,10 +139,28 @@
   static final Text COL_SPLITB = new Text(COLUMN_FAMILY_STR + "splitB");
   // Other constants
 
-  /** used by scanners, etc when they want to start at the beginning of a region */
-  static final Text EMPTY_START_ROW = new Text();
+  /**
+   * An empty instance of Text.
+   */
+  static final Text EMPTY_TEXT = new Text();
+  
+  /**
+   * Used by scanners, etc when they want to start at the beginning of a region
+   */
+  static final Text EMPTY_START_ROW = EMPTY_TEXT;
 
   /** When we encode strings, we always specify UTF8 encoding */
   static final String UTF8_ENCODING = "UTF-8";
 
+  /**
+   * Timestamp to use when we want to refer to the latest cell.
+   * This is the timestamp sent by clients when no timestamp is specified on
+   * commit.
+   */
+  static final long LATEST_TIMESTAMP = Long.MAX_VALUE;
+  
+  /**
+   * Define for 'return-all-versions'.
+   */
+  static final int ALL_VERSIONS = -1;
 }

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HInternalScannerInterface.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HInternalScannerInterface.java?rev=574287&r1=574286&r2=574287&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HInternalScannerInterface.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HInternalScannerInterface.java Mon Sep 10 08:56:16 2007
@@ -19,11 +19,6 @@
  */
 package org.apache.hadoop.hbase;
 
-import java.io.IOException;
-import java.util.TreeMap;
-
-import org.apache.hadoop.io.Text;
-
 /**
  * Internally, we need to be able to determine if the scanner is doing wildcard
  * column matches (when only a column family is specified or if a column regex
@@ -31,29 +26,7 @@
  * specified. If so, we need to ignore the timestamp to ensure that we get all
  * the family members, as they may have been last updated at different times.
  */
-public interface HInternalScannerInterface {
-  
-  /**
-   * Grab the next row's worth of values. The HScanner will return the most
-   * recent data value for each row that is not newer than the target time.
-   * 
-   * If a dataFilter is defined, it will be used to skip rows that do not
-   * match its criteria. It may cause the scanner to stop prematurely if it
-   * knows that it will no longer accept the remaining results.
-   * 
-   * @param key HStoreKey containing row and timestamp
-   * @param results Map of column/value pairs
-   * @return true if a value was found
-   * @throws IOException
-   */
-  public boolean next(HStoreKey key, TreeMap<Text, byte []> results)
-  throws IOException;
-  
-  /**
-   * Close the scanner.
-   */
-  public void close();
-  
+public interface HInternalScannerInterface extends HScannerInterface {
   /** @return true if the scanner is matching a column family or regex */
   public boolean isWildcardScanner();
   

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java?rev=574287&r1=574286&r2=574287&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java Mon Sep 10 08:56:16 2007
@@ -175,18 +175,15 @@
    * @return An array of byte arrays ordered by timestamp.
    */
   public byte [][] get(final HStoreKey key, final int numVersions) {
-    List<byte []> results = new ArrayList<byte[]>();
     this.lock.obtainReadLock();
     try {
-      ArrayList<byte []> result =
-        get(memcache, key, numVersions - results.size());
-      results.addAll(0, result);
+      ArrayList<byte []> results = get(memcache, key, numVersions);
       for (int i = history.size() - 1; i >= 0; i--) {
         if (numVersions > 0 && results.size() >= numVersions) {
           break;
         }
-        result = get(history.elementAt(i), key, numVersions - results.size());
-        results.addAll(results.size(), result);
+        results.addAll(results.size(),
+          get(history.elementAt(i), key, numVersions - results.size()));
       }
       return (results.size() == 0)?
         null: ImmutableBytesWritable.toArray(results);
@@ -194,7 +191,6 @@
       this.lock.releaseReadLock();
     }
   }
-
   
   /**
    * Return all the available columns for the given key.  The key indicates a 
@@ -248,7 +244,8 @@
    * @param map
    * @param key
    * @param numVersions
-   * @return Ordered list of items found in passed <code>map</code>
+   * @return Ordered list of items found in passed <code>map</code>.  If no
+   * matching values, returns an empty list (does not return null).
    */
   ArrayList<byte []> get(final TreeMap<HStoreKey, byte []> map,
       final HStoreKey key, final int numVersions) {
@@ -261,21 +258,87 @@
     for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) {
       HStoreKey itKey = es.getKey();
       if (itKey.matchesRowCol(curKey)) {
-        if(HGlobals.deleteBytes.compareTo(es.getValue()) == 0) {
-          // TODO: Shouldn't this be a continue rather than a break?  Perhaps
-          // the intent is that this DELETE_BYTES is meant to suppress older
-          // info -- see 5.4 Compactions in BigTable -- but how does this jibe
-          // with being able to remove one version only?
-          break;
+        if (!isDeleted(es.getValue())) {
+          result.add(tailMap.get(itKey));
+          curKey.setVersion(itKey.getTimestamp() - 1);
         }
-        result.add(tailMap.get(itKey));
-        curKey.setVersion(itKey.getTimestamp() - 1);
       }
       if (numVersions > 0 && result.size() >= numVersions) {
         break;
       }
     }
     return result;
+  }
+
+  /**
+   * Get <code>versions</code> keys matching the origin key's
+   * row/column/timestamp and those of an older vintage
+   * Default access so can be accessed out of {@link HRegionServer}.
+   * @param origin Where to start searching.
+   * @param versions How many versions to return. Pass
+   * {@link HConstants.ALL_VERSIONS} to retrieve all.
+   * @return Ordered list of <code>versions</code> keys going from newest back.
+   * @throws IOException
+   */
+  List<HStoreKey> getKeys(final HStoreKey origin, final int versions) {
+    this.lock.obtainReadLock();
+    try {
+      List<HStoreKey> results = getKeys(this.memcache, origin, versions);
+      for (int i = history.size() - 1; i >= 0; i--) {
+        results.addAll(results.size(), getKeys(history.elementAt(i), origin,
+          versions == HConstants.ALL_VERSIONS? versions:
+           (results != null? versions - results.size(): versions)));
+      }
+      return results;
+    } finally {
+      this.lock.releaseReadLock();
+    }
+  }
+
+  /*
+   * @param origin Where to start searching.
+   * @param versions How many versions to return. Pass
+   * {@link HConstants.ALL_VERSIONS} to retrieve all.
+   * @return List of all keys that are of the same row and column and of
+   * equal or older timestamp.  If no keys, returns an empty List. Does not
+   * return null.
+   */
+  private List<HStoreKey> getKeys(final TreeMap<HStoreKey, byte []> map,
+      final HStoreKey origin, final int versions) {
+    List<HStoreKey> result = new ArrayList<HStoreKey>();
+    SortedMap<HStoreKey, byte []> tailMap = map.tailMap(origin);
+    for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) {
+      HStoreKey key = es.getKey();
+      if (!key.matchesRowCol(origin)) {
+        break;
+      }
+      if (!isDeleted(es.getValue())) {
+        result.add(key);
+        if (versions != HConstants.ALL_VERSIONS && result.size() >= versions) {
+          // We have enough results.  Return.
+          break;
+        }
+      }
+    }
+    return result;
+  }
+
+  /**
+   * @param key
+   * @return True if an entry and its content is {@link HGlobals.deleteBytes}.
+   * Use checking values in store. On occasion the memcache has the fact that
+   * the cell has been deleted.
+   */
+  boolean isDeleted(final HStoreKey key) {
+    return isDeleted(this.memcache.get(key));
+  }
+
+  /**
+   * @param value
+   * @return True if an entry and its content is {@link HGlobals.deleteBytes}.
+   */
+  boolean isDeleted(final byte [] value) {
+    return (value == null)? false: HGlobals.deleteBytes.compareTo(value) == 0;
   }
 
   /**

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java?rev=574287&r1=574286&r2=574287&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java Mon Sep 10 08:56:16 2007
@@ -25,6 +25,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.Vector;
@@ -581,6 +582,9 @@
     lock.obtainReadLock();
     try {
       HStore.HStoreSize biggest = largestHStore(midKey);
+      if (biggest == null) {
+        return false;
+      }
       long triggerSize =
         this.desiredMaxFileSize + (this.desiredMaxFileSize / 2);
       boolean split = (biggest.getAggregate() >= triggerSize);
@@ -911,26 +915,47 @@
     }
   }
 
-  /** Private implementation: get the value for the indicated HStoreKey */
-  private byte [][] get(HStoreKey key, int numVersions) throws IOException {
-
+  private byte [][] get(final HStoreKey key, final int numVersions)
+  throws IOException {
     lock.obtainReadLock();
     try {
       // Check the memcache
-      byte [][] result = memcache.get(key, numVersions);
-      if(result != null) {
-        return result;
+      byte [][] memcacheResult = this.memcache.get(key, numVersions);
+      // If we got sufficient versions from memcache, return.
+      if (memcacheResult != null && memcacheResult.length == numVersions) {
+        return memcacheResult;
       }
 
-      // If unavailable in memcache, check the appropriate HStore
+      // Check hstore for more versions.
       Text colFamily = HStoreKey.extractFamily(key.getColumn());
       HStore targetStore = stores.get(colFamily);
       if(targetStore == null) {
-        return null;
+        // There are no stores.  Return what we got from memcache.
+        return memcacheResult;
       }
-
-      return targetStore.get(key, numVersions);
       
+      // Update the number of versions we need to fetch from the store.
+      int amendedNumVersions = numVersions;
+      if (memcacheResult != null) {
+        amendedNumVersions -= memcacheResult.length;
+      }
+      byte [][] result =
+        targetStore.get(key, amendedNumVersions, this.memcache);
+      if (result == null) {
+        result = memcacheResult;
+      } else if (memcacheResult != null) {
+        // We have results from both memcache and from stores.  Put them
+        // together in an array in the proper order.
+        byte [][] storeResult = result;
+        result = new byte [memcacheResult.length + result.length][];
+        for (int i = 0; i < memcacheResult.length; i++) {
+          result[i] = memcacheResult[i];
+        }
+        for (int i = 0; i < storeResult.length; i++) {
+          result[i + memcacheResult.length] = storeResult[i];
+        }
+      }
+      return result;
     } finally {
       lock.releaseReadLock();
     }
@@ -963,6 +988,45 @@
   }
 
   /**
+   * Get all keys matching the origin key's row/column/timestamp and those
+   * of an older vintage
+   * Default access so can be accessed out of {@link HRegionServer}.
+   * @param origin Where to start searching.
+   * @return Ordered list of keys going from newest on back.
+   * @throws IOException
+   */
+  List<HStoreKey> getKeys(final HStoreKey origin) throws IOException {
+    return getKeys(origin, ALL_VERSIONS);
+  }
+  
+  /**
+   * Get <code>versions</code> keys matching the origin key's
+   * row/column/timestamp and those of an older vintage
+   * Default access so can be accessed out of {@link HRegionServer}.
+   * @param origin Where to start searching.
+   * @param versions How many versions to return. Pass
+   * {@link HConstants.ALL_VERSIONS} to retrieve all.
+   * @return Ordered list of <code>versions</code> keys going from newest back.
+   * @throws IOException
+   */
+  List<HStoreKey> getKeys(final HStoreKey origin, final int versions)
+  throws IOException {
+    List<HStoreKey> keys = this.memcache.getKeys(origin, versions);
+    if (versions != ALL_VERSIONS && keys.size() >= versions) {
+      return keys;
+    }
+    // Check hstore for more versions.
+    Text colFamily = HStoreKey.extractFamily(origin.getColumn());
+    HStore targetStore = stores.get(colFamily);
+    if (targetStore != null) {
+      // Pass versions without modification since in the store getKeys, it
+      // includes the size of the passed <code>keys</code> array when counting.
+      keys = targetStore.getKeys(origin, keys, versions);
+    }
+    return keys;
+  }
+
+  /**
    * Return an iterator that scans over the HRegion, returning the indicated 
    * columns for only the rows that match the data filter.  This Iterator must be closed by the caller.
    *
@@ -1110,8 +1174,8 @@
   }
 
   /**
-   * Delete a value or write a value. This is a just a convenience method for put().
-   *
+   * Delete a value or write a value.
+   * This is a just a convenience method for put().
    * @param lockid lock id obtained from startUpdate
    * @param targetCol name of column to be deleted
    * @throws IOException
@@ -1119,6 +1183,51 @@
   public void delete(long lockid, Text targetCol) throws IOException {
     localput(lockid, targetCol, HGlobals.deleteBytes.get());
   }
+  
+  /**
+   * Delete all cells of the same age as the passed timestamp or older.
+   * @param row
+   * @param column
+   * @param ts Delete all entries that have this timestamp or older
+   * @throws IOException
+   */
+  public void deleteAll(final Text row, final Text column, final long ts)
+  throws IOException {
+    deleteMultiple(row, column, ts, ALL_VERSIONS);
+  }
+  
+  /**
+   * Delete one or many cells.
+   * Used to support {@link #deleteAll(Text, Text, long)} and deletion of
+   * latest cell.
+   * @param row
+   * @param column
+   * @param ts Timestamp to start search on.
+   * @param versions How many versions to delete. Pass
+   * {@link HConstants.ALL_VERSIONS} to delete all.
+   * @throws IOException
+   */
+  void deleteMultiple(final Text row, final Text column, final long ts,
+    final int versions)
+  throws IOException {
+    lock.obtainReadLock();
+    try {
+      checkColumn(column);
+      HStoreKey origin = new HStoreKey(row, column, ts);
+      synchronized(row) {
+        List<HStoreKey> keys = getKeys(origin, versions);
+        if (keys.size() > 0) {
+          TreeMap<Text, byte []> edits = new TreeMap<Text, byte []>();
+          edits.put(column, HGlobals.deleteBytes.get());
+          for (HStoreKey key: keys) {
+            update(row, key.getTimestamp(), edits);
+          }
+        }
+      }
+    } finally {
+      lock.releaseReadLock();
+    }
+  }
 
   /**
    * Private implementation.
@@ -1202,10 +1311,11 @@
    * Once updates hit the change log, they are safe.  They will either be moved 
    * into an HStore in the future, or they will be recovered from the log.
    * @param lockid Lock for row we're to commit.
-   * @param timestamp the time to associate with this change
+   * @param timestamp the time to associate with this change.
    * @throws IOException
    */
-  public void commit(final long lockid, long timestamp) throws IOException {
+  public void commit(final long lockid, final long timestamp)
+  throws IOException {
     // Remove the row from the pendingWrites list so 
     // that repeated executions won't screw this up.
     Text row = getRowFromLock(lockid);
@@ -1216,19 +1326,75 @@
     // This check makes sure that another thread from the client
     // hasn't aborted/committed the write-operation
     synchronized(row) {
-      // Add updates to the log and add values to the memcache.
       Long lid = Long.valueOf(lockid);
-      TreeMap<Text, byte []> columns =  this.targetColumns.get(lid);
-      if (columns != null && columns.size() > 0) {
-        log.append(regionInfo.regionName, regionInfo.tableDesc.getName(),
-          row, columns, timestamp);
-        memcache.add(row, columns, timestamp);
-        // OK, all done!
-      }
+      update(row, timestamp, this.targetColumns.get(lid));
       targetColumns.remove(lid);
       releaseRowLock(row);
     }
   }
+  
+  /**
+   * This method for unit testing only.
+   * Does each operation individually so can do appropriate
+   * {@link HConstants#LATEST_TIMESTAMP} action.  Tries to mimic how
+   * {@link HRegionServer#batchUpdate(Text, long, org.apache.hadoop.hbase.io.BatchUpdate)}
+   * works when passed a timestamp of LATEST_TIMESTAMP.
+   * @param lockid Lock for row we're to commit.
+   * @throws IOException 
+   * @throws IOException
+   * @see {@link #commit(long, long)}
+   */
+  void commit(final long lockid) throws IOException {
+    // Remove the row from the pendingWrites list so 
+    // that repeated executions won't screw this up.
+    Text row = getRowFromLock(lockid);
+    if(row == null) {
+      throw new LockException("No write lock for lockid " + lockid);
+    }
+    
+    // This check makes sure that another thread from the client
+    // hasn't aborted/committed the write-operation
+    synchronized(row) {
+      Long lid = Long.valueOf(lockid);
+      TreeMap<Text, byte []> updatesByColumn = this.targetColumns.get(lid);
+      // Run updates one at a time so we can supply appropriate timestamp
+      long now = System.currentTimeMillis();
+      for (Map.Entry<Text, byte []>e: updatesByColumn.entrySet()) {
+        if (HGlobals.deleteBytes.equals(e.getValue())) {
+          // Its a delete.  Delete latest.  deleteMultiple calls update for us.
+          // Actually regets the row lock but since we already have it, should
+          // be fine.
+          deleteMultiple(row, e.getKey(), LATEST_TIMESTAMP, 1);
+          continue;
+        }
+        // Must be a 'put'.
+        TreeMap<Text, byte []> putEdit = new TreeMap<Text, byte []>();
+        putEdit.put(e.getKey(), e.getValue());
+        update(row, now, putEdit);
+      }
+      this.targetColumns.remove(lid);
+      releaseRowLock(row);
+    }
+  }
+   
+  /* 
+   * Add updates to the log and add values to the memcache.
+   * Warning: Assumption is caller has lock on passed in row.
+   * @param row Row to update.
+   * @param timestamp Timestamp to record the updates against
+   * @param updatesByColumn Cell updates by column
+   * @throws IOException
+   */
+  private void update(final Text row, final long timestamp,
+    final TreeMap<Text, byte []> updatesByColumn)
+  throws IOException {
+    if (updatesByColumn == null || updatesByColumn.size() <= 0) {
+      return;
+    }
+    this.log.append(regionInfo.regionName, regionInfo.tableDesc.getName(),
+        row, updatesByColumn, timestamp);
+    this.memcache.add(row, updatesByColumn, timestamp);
+  }
 
   //////////////////////////////////////////////////////////////////////////////
   // Support code
@@ -1250,7 +1416,11 @@
     }
   }
   
-  /** Make sure this is a valid column for the current table */
+  /**
+   * Make sure this is a valid column for the current table
+   * @param columnName
+   * @throws IOException
+   */
   void checkColumn(Text columnName) throws IOException {
     Text family = new Text(HStoreKey.extractFamily(columnName) + ":");
     if(! regionInfo.tableDesc.hasFamily(family)) {
@@ -1359,10 +1529,6 @@
         dataFilter.reset();
       }
       this.scanners = new HInternalScannerInterface[stores.length + 1];
-      for(int i = 0; i < this.scanners.length; i++) {
-        this.scanners[i] = null;
-      }
-      
       this.resultSets = new TreeMap[scanners.length];
       this.keys = new HStoreKey[scanners.length];
       this.wildcardMatch = false;
@@ -1424,12 +1590,11 @@
     public boolean isMultipleMatchScanner() {
       return multipleMatchers;
     }
-    
-    /**
-     * {@inheritDoc}
-     */
-    public boolean next(HStoreKey key, TreeMap<Text, byte[]> results)
+
+    public boolean next(HStoreKey key, SortedMap<Text, byte[]> results)
     throws IOException {
+      // Filtered flag is set by filters.  If a cell has been 'filtered out'
+      // -- i.e. it is not to be returned to the caller -- the flag is 'true'.
       boolean filtered = true;
       boolean moreToFollow = true;
       while (filtered && moreToFollow) {
@@ -1446,19 +1611,27 @@
             chosenTimestamp = keys[i].getTimestamp();
           }
         }
-
+        
         // Filter whole row by row key?
         filtered = dataFilter != null? dataFilter.filter(chosenRow) : false;
 
         // Store the key and results for each sub-scanner. Merge them as
         // appropriate.
-        if (chosenTimestamp > 0 && !filtered) {
+        if (chosenTimestamp >= 0 && !filtered) {
+          // Here we are setting the passed in key with current row+timestamp
           key.setRow(chosenRow);
           key.setVersion(chosenTimestamp);
-          key.setColumn(new Text(""));
-
+          key.setColumn(HConstants.EMPTY_TEXT);
+          // Keep list of deleted cell keys within this row.  We need this
+          // because as we go through scanners, the delete record may be in an
+          // early scanner and then the same record with a non-delete, non-null
+          // value in a later. Without history of what we've seen, we'll return
+          // deleted values. This List should not ever grow too large since we
+          // are only keeping rows and columns that match those set on the
+          // scanner and which have delete values.  If memory usage becomes a
+          // problem, could redo as bloom filter.
+          List<HStoreKey> deletes = new ArrayList<HStoreKey>();
           for (int i = 0; i < scanners.length && !filtered; i++) {
-
             while ((scanners[i] != null
                 && !filtered
                 && moreToFollow)
@@ -1481,8 +1654,19 @@
               // but this had the effect of overwriting newer
               // values with older ones. So now we only insert
               // a result if the map does not contain the key.
+              HStoreKey hsk = new HStoreKey(key.getRow(), EMPTY_TEXT,
+                key.getTimestamp());
               for (Map.Entry<Text, byte[]> e : resultSets[i].entrySet()) {
-                if (!filtered && moreToFollow &&
+                hsk.setColumn(e.getKey());
+                if (HGlobals.deleteBytes.equals(e.getValue())) {
+                  if (!deletes.contains(hsk)) {
+                    // Key changes as we cycle the for loop so add a copy to
+                    // the set of deletes.
+                    deletes.add(new HStoreKey(hsk));
+                  }
+                } else if (!deletes.contains(hsk) &&
+                    !filtered &&
+                    moreToFollow &&
                     !results.containsKey(e.getKey())) {
                   if (dataFilter != null) {
                     // Filter whole row by column data?
@@ -1496,7 +1680,6 @@
                   results.put(e.getKey(), e.getValue());
                 }
               }
-
               resultSets[i].clear();
               if (!scanners[i].next(keys[i], resultSets[i])) {
                 closeScanner(i);
@@ -1516,8 +1699,8 @@
             }
           }
         }
-        
-        moreToFollow = chosenTimestamp > 0;
+
+        moreToFollow = chosenTimestamp >= 0;
         
         if (dataFilter != null) {
           if (moreToFollow) {
@@ -1533,6 +1716,17 @@
             LOG.debug("ROWKEY = " + chosenRow + ", FILTERED = " + filtered);
           }
         }
+        
+        if (results.size() <= 0 && !filtered) {
+          // There were no results found for this row.  Marked it as 
+          // 'filtered'-out otherwise we will not move on to the next row.
+          filtered = true;
+        }
+      }
+      
+      // If we got no results, then there is no more to follow.
+      if (results == null || results.size() <= 0) {
+        moreToFollow = false;
       }
       
       // Make sure scanners closed if no more results
@@ -1551,7 +1745,11 @@
     /** Shut down a single scanner */
     void closeScanner(int i) {
       try {
-        scanners[i].close();
+        try {
+          scanners[i].close();
+        } catch (IOException e) {
+          LOG.warn("Failed closeing scanner " + i, e);
+        }
       } finally {
         scanners[i] = null;
         keys[i] = null;

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java?rev=574287&r1=574286&r2=574287&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java Mon Sep 10 08:56:16 2007
@@ -38,8 +38,8 @@
   /** 
    * Get metainfo about an HRegion
    * 
-   * @param regionName                  - name of the region
-   * @return                            - HRegionInfo object for region
+   * @param regionName name of the region
+   * @return HRegionInfo object for region
    * @throws NotServingRegionException
    */
   public HRegionInfo getRegionInfo(final Text regionName)
@@ -69,7 +69,7 @@
    * @throws IOException
    */
   public byte [][] get(final Text regionName, final Text row,
-      final Text column, final int numVersions)
+    final Text column, final int numVersions)
   throws IOException;
   
   /**
@@ -107,7 +107,21 @@
    * @param b BatchUpdate
    * @throws IOException
    */
-  public void batchUpdate(Text regionName, long timestamp, BatchUpdate b) throws IOException;
+  public void batchUpdate(Text regionName, long timestamp, BatchUpdate b)
+  throws IOException;
+  
+  /**
+   * Delete all cells that match the passed row and column and whose
+   * timestamp is equal-to or older than the passed timestamp.
+   *
+   * @param regionName region name
+   * @param row row key
+   * @param column column key
+   * @param timestamp Delete all entries that have this timestamp or older
+   * @throws IOException
+   */
+  public void deleteAll(Text regionName, Text row, Text column, long timestamp)
+  throws IOException;
   
   //
   // remote scanner interface

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java?rev=574287&r1=574286&r2=574287&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java Mon Sep 10 08:56:16 2007
@@ -24,6 +24,7 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.SortedMap;
@@ -1075,22 +1076,13 @@
       leases.renewLease(scannerId, scannerId);
 
       // Collect values to be returned here
-
       MapWritable values = new MapWritable();
-
-      // Keep getting rows until we find one that has at least one non-deleted column value
-
       HStoreKey key = new HStoreKey();
       TreeMap<Text, byte []> results = new TreeMap<Text, byte []>();
       while (s.next(key, results)) {
         for(Map.Entry<Text, byte []> e: results.entrySet()) {
-          HStoreKey k = new HStoreKey(key.getRow(), e.getKey(), key.getTimestamp());
-          byte [] val = e.getValue();
-          if (HGlobals.deleteBytes.compareTo(val) == 0) {
-            // Column value is deleted. Don't return it.
-            continue;
-          }
-          values.put(k, new ImmutableBytesWritable(val));
+          values.put(new HStoreKey(key.getRow(), e.getKey(), key.getTimestamp()),
+            new ImmutableBytesWritable(e.getValue()));
         }
 
         if(values.size() > 0) {
@@ -1099,7 +1091,6 @@
         }
 
         // No data for this row, go get another.
-
         results.clear();
       }
       return values;
@@ -1110,26 +1101,46 @@
     }
   }
 
-  /** {@inheritDoc} */
   public void batchUpdate(Text regionName, long timestamp, BatchUpdate b)
-    throws IOException {
-    
+  throws IOException {  
     requestCount.incrementAndGet();
+    // If timestamp == LATEST_TIMESTAMP and we have deletes, then they need
+    // special treatment.  For these we need to first find the latest cell so
+    // when we write the delete, we write it with the latest cells' timestamp
+    // so the delete record overshadows.  This means deletes and puts do not
+    // happen within the same row lock.
+    List<Text> deletes = null;
     try {
       long lockid = startUpdate(regionName, b.getRow());
       for(BatchOperation op: b) {
         switch(op.getOp()) {
-        case BatchOperation.PUT_OP:
+        case PUT:
           put(regionName, lockid, op.getColumn(), op.getValue());
           break;
 
-        case BatchOperation.DELETE_OP:
-          delete(regionName, lockid, op.getColumn());
+        case DELETE:
+          if (timestamp == LATEST_TIMESTAMP) {
+            // Save off these deletes.
+            if (deletes == null) {
+              deletes = new ArrayList<Text>();
+            }
+            deletes.add(op.getColumn());
+          } else {
+            delete(regionName, lockid, op.getColumn());
+          }
           break;
         }
       }
-      commit(regionName, lockid, timestamp);
+      commit(regionName, lockid,
+        (timestamp == LATEST_TIMESTAMP)? System.currentTimeMillis(): timestamp);
       
+      if (deletes != null && deletes.size() > 0) {
+        // We have some LATEST_TIMESTAMP deletes to run.
+        HRegion r = getRegion(regionName);
+        for (Text column: deletes) {
+          r.deleteMultiple(b.getRow(), column, LATEST_TIMESTAMP, 1);
+        }
+      }
     } catch (IOException e) {
       checkFileSystem();
       throw e;
@@ -1158,7 +1169,6 @@
       }
       leases.createLease(scannerId, scannerId, new ScannerListener(scannerName));
       return scannerId;
-
     } catch (IOException e) {
       if (e instanceof RemoteException) {
         try {
@@ -1217,7 +1227,11 @@
         s = scanners.remove(this.scannerName);
       }
       if (s != null) {
-        s.close();
+        try {
+          s.close();
+        } catch (IOException e) {
+          LOG.error("Closing scanner", e);
+        }
       }
     }
   }
@@ -1241,9 +1255,15 @@
 
   protected void delete(Text regionName, long lockid, Text column) 
     throws IOException {
-
     HRegion region = getRegion(regionName);
     region.delete(lockid, column);
+  }
+  
+  public void deleteAll(final Text regionName, final Text row,
+      final Text column, final long timestamp) 
+  throws IOException {
+    HRegion region = getRegion(regionName);
+    region.deleteAll(row, column, timestamp);
   }
 
   protected void commit(Text regionName, final long lockid,

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HScannerInterface.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HScannerInterface.java?rev=574287&r1=574286&r2=574287&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HScannerInterface.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HScannerInterface.java Mon Sep 10 08:56:16 2007
@@ -30,9 +30,12 @@
  */
 public interface HScannerInterface {
   /**
-   * Get the next set of values
+   * Grab the next row's worth of values. The scanner will return the most
+   * recent data value for each row that is not newer than the target time
+   * passed when the scanner was created.
    * @param key will contain the row and timestamp upon return
-   * @param results will contain an entry for each column family member and its value
+   * @param results will contain an entry for each column family member and its
+   * value
    * @return true if data was returned
    * @throws IOException
    */

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java?rev=574287&r1=574286&r2=574287&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java Mon Sep 10 08:56:16 2007
@@ -24,6 +24,9 @@
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -86,7 +89,14 @@
 
   final HLocking lock = new HLocking();
 
+  /* Sorted Map of readers keyed by sequence id (Most recent should be last in
+   * in list).
+   */
   TreeMap<Long, HStoreFile> storefiles = new TreeMap<Long, HStoreFile>();
+  
+  /* Sorted Map of readers keyed by sequence id (Most recent should be last in
+   * in list).
+   */
   TreeMap<Long, MapFile.Reader> readers = new TreeMap<Long, MapFile.Reader>();
 
   Random rand = new Random();
@@ -176,7 +186,7 @@
     // MapFiles are in a reliable state.  Every entry in 'mapdir' must have a 
     // corresponding one in 'loginfodir'. Without a corresponding log info
     // file, the entry in 'mapdir' must be deleted.
-    Vector<HStoreFile> hstoreFiles 
+    Collection<HStoreFile> hstoreFiles 
       = HStoreFile.loadHStoreFiles(conf, dir, regionName, familyName, fs);
     for(HStoreFile hsf: hstoreFiles) {
       this.storefiles.put(Long.valueOf(hsf.loadInfo(fs)), hsf);
@@ -446,30 +456,23 @@
       MapFile.Writer out = flushedFile.getWriter(this.fs, this.compression,
         this.bloomFilter);
       
-      // hbase.hstore.compact.on.flush=true enables picking up an existing
-      // HStoreFIle from disk interlacing the memcache flush compacting as we
-      // go.  The notion is that interlacing would take as long as a pure
-      // flush with the added benefit of having one less file in the store. 
-      // Experiments show that it takes two to three times the amount of time
-      // flushing -- more column families makes it so the two timings come
-      // closer together -- but it also complicates the flush. Disabled for
-      // now.  Needs work picking which file to interlace (favor references
-      // first, etc.)
+      // Here we tried picking up an existing HStoreFile from disk and
+      // interlacing the memcache flush compacting as we go.  The notion was
+      // that interlacing would take as long as a pure flush with the added
+      // benefit of having one less file in the store.  Experiments showed that
+      // it takes two to three times the amount of time flushing -- more column
+      // families makes it so the two timings come closer together -- but it
+      // also complicates the flush. The code was removed.  Needed work picking
+      // which file to interlace (favor references first, etc.)
       //
       // Related, looks like 'merging compactions' in BigTable paper interlaces
       // a memcache flush.  We don't.
       try {
-        if (this.conf.getBoolean("hbase.hstore.compact.on.flush", false) &&
-            this.storefiles.size() > 0) {
-          compact(out, inputCache.entrySet().iterator(),
-              this.readers.get(this.storefiles.firstKey()));
-        } else {
-          for (Map.Entry<HStoreKey, byte []> es: inputCache.entrySet()) {
-            HStoreKey curkey = es.getKey();
-            if (this.familyName.
-                equals(HStoreKey.extractFamily(curkey.getColumn()))) {
-              out.append(curkey, new ImmutableBytesWritable(es.getValue()));
-            }
+        for (Map.Entry<HStoreKey, byte []> es: inputCache.entrySet()) {
+          HStoreKey curkey = es.getKey();
+          if (this.familyName.
+              equals(HStoreKey.extractFamily(curkey.getColumn()))) {
+            out.append(curkey, new ImmutableBytesWritable(es.getValue()));
           }
         }
       } finally {
@@ -546,7 +549,6 @@
    * 
    * We don't want to hold the structureLock for the whole time, as a compact() 
    * can be lengthy and we want to allow cache-flushes during this period.
-   * 
    * @throws IOException
    */
   void compact() throws IOException {
@@ -564,6 +566,8 @@
    * @param maxSeenSeqID We may have already calculated the maxSeenSeqID.  If
    * so, pass it here.  Otherwise, pass -1 and it will be calculated inside in
    * this method.
+   * @param deleteSequenceInfo
+   * @param maxSeenSeqID
    * @throws IOException
    */
   void compactHelper(final boolean deleteSequenceInfo, long maxSeenSeqID)
@@ -584,7 +588,7 @@
         }
       }
       try {
-        Vector<HStoreFile> toCompactFiles = getFilesToCompact();
+        List<HStoreFile> toCompactFiles = getFilesToCompact();
         HStoreFile compactedOutputFile =
           new HStoreFile(conf, this.compactionDir, regionName, familyName, -1);
         if (toCompactFiles.size() < 1 ||
@@ -664,17 +668,21 @@
   }
   
   /*
-   * @return list of files to compact
+   * @return list of files to compact sorted so most recent comes first.
    */
-  private Vector<HStoreFile> getFilesToCompact() {
-    Vector<HStoreFile> toCompactFiles = null;
+  private List<HStoreFile> getFilesToCompact() {
+    List<HStoreFile> filesToCompact = null;
     this.lock.obtainWriteLock();
     try {
-      toCompactFiles = new Vector<HStoreFile>(storefiles.values());
+      // Storefiles are keyed by sequence id.  The oldest file comes first.
+      // We need to return out of here a List that has the newest file as
+      // first.
+      filesToCompact = new ArrayList<HStoreFile>(this.storefiles.values());
+      Collections.reverse(filesToCompact);
     } finally {
       this.lock.releaseWriteLock();
     }
-    return toCompactFiles;
+    return filesToCompact;
   }
   
   /*
@@ -694,7 +702,7 @@
    * @throws IOException
    */
   void compact(final MapFile.Writer compactedOut,
-      final Vector<HStoreFile> toCompactFiles)
+      final List<HStoreFile> toCompactFiles)
   throws IOException {
     int size = toCompactFiles.size();
     CompactionReader[] rdrs = new CompactionReader[size];
@@ -842,8 +850,14 @@
     int timesSeen = 0;
     Text lastRow = new Text();
     Text lastColumn = new Text();
-    while(numDone < done.length) {
-      // Find the reader with the smallest key
+    // Map of a row deletes keyed by column with a list of timestamps for value
+    Map<Text, List<Long>> deletes = null;
+    while (numDone < done.length) {
+      // Find the reader with the smallest key.  If two files have same key
+      // but different values -- i.e. one is delete and other is non-delete
+      // value -- we will find the first, the one that was written later and
+      // therefore the one whose value should make it out to the compacted
+      // store file.
       int smallestKey = -1;
       for(int i = 0; i < rdrs.length; i++) {
         if(done[i]) {
@@ -865,24 +879,23 @@
         timesSeen++;
       } else {
         timesSeen = 1;
+        // We are on to a new row.  Create a new deletes list.
+        deletes = new HashMap<Text, List<Long>>();
       }
 
-      if(timesSeen <= family.getMaxVersions()) {
+      byte [] value = (vals[smallestKey] == null)?
+        null: vals[smallestKey].get();
+      if (!isDeleted(sk, value, null, deletes) &&
+          timesSeen <= family.getMaxVersions()) {
         // Keep old versions until we have maxVersions worth.
         // Then just skip them.
-        if(sk.getRow().getLength() != 0
-            && sk.getColumn().getLength() != 0) {
+        if (sk.getRow().getLength() != 0 && sk.getColumn().getLength() != 0) {
           // Only write out objects which have a non-zero length key and
           // value
           compactedOut.append(sk, vals[smallestKey]);
         }
       }
 
-      // TODO: I don't know what to do about deleted values.  I currently 
-      // include the fact that the item was deleted as a legitimate 
-      // "version" of the data.  Maybe it should just drop the deleted
-      // val?
-
       // Update last-seen items
       lastRow.set(sk.getRow());
       lastColumn.set(sk.getColumn());
@@ -900,6 +913,52 @@
   }
 
   /*
+   * Check if this is cell is deleted.
+   * If a memcache and a deletes, check key does not have an entry filled.
+   * Otherwise, check value is not the <code>HGlobals.deleteBytes</code> value.
+   * If passed value IS deleteBytes, then it is added to the passed
+   * deletes map.
+   * @param hsk
+   * @param value
+   * @param memcache Can be null.
+   * @param deletes Map keyed by column with a value of timestamp. Can be null.
+   * If non-null and passed value is HGlobals.deleteBytes, then we add to this
+   * map.
+   * @return True if this is a deleted cell.  Adds the passed deletes map if
+   * passed value is HGlobals.deleteBytes.
+  */
+  private boolean isDeleted(final HStoreKey hsk, final byte [] value,
+      final HMemcache memcache, final Map<Text, List<Long>> deletes) {
+    if (memcache != null && memcache.isDeleted(hsk)) {
+      return true;
+    }
+    List<Long> timestamps = (deletes == null)?
+      null: deletes.get(hsk.getColumn());
+    if (timestamps != null &&
+      timestamps.contains(Long.valueOf(hsk.getTimestamp()))) {
+      return true;
+    }
+    if (value == null) {
+      // If a null value, shouldn't be in here.  Mark it as deleted cell.
+      return true;
+    }
+    if (!HGlobals.deleteBytes.equals(value)) {
+      return false;
+    }
+    // Cell has delete value.  Save it into deletes.
+    if (deletes != null) {
+      if (timestamps == null) {
+        timestamps = new ArrayList<Long>();
+        deletes.put(hsk.getColumn(), timestamps);
+      }
+      // We know its not already in the deletes array else we'd have returned
+      // earlier so no need to test if timestamps already has this value.
+      timestamps.add(Long.valueOf(hsk.getTimestamp()));
+    }
+    return true;
+  }
+  
+  /*
    * It's assumed that the compactLock  will be acquired prior to calling this 
    * method!  Otherwise, it is not thread-safe!
    *
@@ -1061,22 +1120,37 @@
    * previous 'numVersions-1' values, as well.
    *
    * If 'numVersions' is negative, the method returns all available versions.
+   * @param key
+   * @param numVersions Number of versions to fetch.  Must be > 0.
+   * @param memcache Checked for deletions
+   * @return
+   * @throws IOException
    */
-  byte [][] get(HStoreKey key, int numVersions) throws IOException {
+  byte [][] get(HStoreKey key, int numVersions, final HMemcache memcache)
+  throws IOException {
     if (numVersions <= 0) {
       throw new IllegalArgumentException("Number of versions must be > 0");
     }
     
     List<byte []> results = new ArrayList<byte []>();
+    // Keep a list of deleted cell keys.  We need this because as we go through
+    // the store files, the cell with the delete marker may be in one file and
+    // the old non-delete cell value in a later store file. If we don't keep
+    // around the fact that the cell was deleted in a newer record, we end up
+    // returning the old value if user is asking for more than one version.
+    // This List of deletes should not large since we are only keeping rows
+    // and columns that match those set on the scanner and which have delete
+    // values.  If memory usage becomes an issue, could redo as bloom filter.
+    Map<Text, List<Long>> deletes = new HashMap<Text, List<Long>>();
+    // This code below is very close to the body of the getKeys method.
     this.lock.obtainReadLock();
     try {
       MapFile.Reader[] maparray = getReaders();
       for(int i = maparray.length - 1; i >= 0; i--) {
         MapFile.Reader map = maparray[i];
-
         synchronized(map) {
-          ImmutableBytesWritable readval = new ImmutableBytesWritable();
           map.reset();
+          ImmutableBytesWritable readval = new ImmutableBytesWritable();
           HStoreKey readkey = (HStoreKey)map.getClosest(key, readval);
           if (readkey == null) {
             // map.getClosest returns null if the passed key is > than the
@@ -1085,29 +1159,102 @@
             // BEFORE.
             continue;
           }
-          if (readkey.matchesRowCol(key)) {
-            if(readval.equals(HGlobals.deleteBytes)) {
+          if (!readkey.matchesRowCol(key)) {
+            continue;
+          }
+          if (!isDeleted(readkey, readval.get(), memcache, deletes)) {
+            results.add(readval.get());
+            // Perhaps only one version is wanted.  I could let this
+            // test happen later in the for loop test but it would cost
+            // the allocation of an ImmutableBytesWritable.
+            if (hasEnoughVersions(numVersions, results)) {
               break;
             }
-            results.add(readval.get());
-            readval = new ImmutableBytesWritable();
-            while(map.next(readkey, readval) && readkey.matchesRowCol(key)) {
-              if ((numVersions > 0 && (results.size() >= numVersions))
-                  || readval.equals(HGlobals.deleteBytes)) {
-                break;
-              }
+          }
+          while ((readval = new ImmutableBytesWritable()) != null &&
+              map.next(readkey, readval) &&
+              readkey.matchesRowCol(key) &&
+              !hasEnoughVersions(numVersions, results)) {
+            if (!isDeleted(readkey, readval.get(), memcache, deletes)) {
               results.add(readval.get());
-              readval = new ImmutableBytesWritable();
             }
           }
         }
-        if(results.size() >= numVersions) {
+        if (hasEnoughVersions(numVersions, results)) {
           break;
         }
       }
-
       return results.size() == 0 ?
         null : ImmutableBytesWritable.toArray(results);
+    } finally {
+      this.lock.releaseReadLock();
+    }
+  }
+  
+  private boolean hasEnoughVersions(final int numVersions,
+      final List<byte []> results) {
+    return numVersions > 0 && results.size() >= numVersions;
+  }
+
+  /**
+   * Get <code>versions</code> keys matching the origin key's
+   * row/column/timestamp and those of an older vintage
+   * Default access so can be accessed out of {@link HRegionServer}.
+   * @param origin Where to start searching.
+   * @param versions How many versions to return. Pass
+   * {@link HConstants.ALL_VERSIONS} to retrieve all. Versions will include
+   * size of passed <code>allKeys</code> in its count.
+   * @param allKeys List of keys prepopulated by keys we found in memcache.
+   * This method returns this passed list with all matching keys found in
+   * stores appended.
+   * @return The passed <code>allKeys</code> with <code>versions</code> of
+   * matching keys found in store files appended.
+   * @throws IOException
+   */
+  List<HStoreKey> getKeys(final HStoreKey origin, List<HStoreKey> allKeys,
+      final int versions)
+  throws IOException {
+    if (allKeys == null) {
+      allKeys = new ArrayList<HStoreKey>();
+    }
+    // This code below is very close to the body of the get method.
+    this.lock.obtainReadLock();
+    try {
+      MapFile.Reader[] maparray = getReaders();
+      for(int i = maparray.length - 1; i >= 0; i--) {
+        MapFile.Reader map = maparray[i];
+        synchronized(map) {
+          map.reset();
+          ImmutableBytesWritable readval = new ImmutableBytesWritable();
+          HStoreKey readkey = (HStoreKey)map.getClosest(origin, readval);
+          if (readkey == null) {
+            // map.getClosest returns null if the passed key is > than the
+            // last key in the map file.  getClosest is a bit of a misnomer
+            // since it returns exact match or the next closest key AFTER not
+            // BEFORE.
+            continue;
+          }
+          if (!readkey.matchesRowCol(origin)) {
+            continue;
+          }
+          if (!isDeleted(readkey, readval.get(), null, null) &&
+              !allKeys.contains(readkey)) {
+            allKeys.add(new HStoreKey(readkey));
+          }
+          while ((readval = new ImmutableBytesWritable()) != null &&
+              map.next(readkey, readval) &&
+              readkey.matchesRowCol(origin)) {
+            if (!isDeleted(readkey, readval.get(), null, null) &&
+                !allKeys.contains(readkey)) {
+              allKeys.add(new HStoreKey(readkey));
+              if (versions != ALL_VERSIONS && allKeys.size() >= versions) {
+                break;
+              }
+            }
+          }
+        }
+      }
+      return allKeys;
     } finally {
       this.lock.releaseReadLock();
     }

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java?rev=574287&r1=574286&r2=574287&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java Mon Sep 10 08:56:16 2007
@@ -35,7 +35,6 @@
 import org.apache.hadoop.hbase.filter.RowFilterInterface;
 import org.apache.hadoop.hbase.io.BatchUpdate;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -531,10 +530,11 @@
   }
   
   /** 
-   * Delete the value for a column
-   *
-   * @param lockid              - lock id returned from startUpdate
-   * @param column              - name of column whose value is to be deleted
+   * Delete the value for a column.
+   * Deletes the cell whose row/column/commit-timestamp match those of the
+   * delete.
+   * @param lockid lock id returned from startUpdate
+   * @param column name of column whose value is to be deleted
    */
   public void delete(long lockid, Text column) {
     checkClosed();
@@ -543,9 +543,59 @@
   }
   
   /** 
+   * Delete all values for a column
+   * 
+   * @param row Row to update
+   * @param column name of column whose value is to be deleted
+   * @throws IOException 
+   */
+  public void deleteAll(final Text row, final Text column) throws IOException {
+    deleteAll(row, column, LATEST_TIMESTAMP);
+  }
+  
+  /** 
+   * Delete all values for a column
+   * 
+   * @param row Row to update
+   * @param column name of column whose value is to be deleted
+   * @param ts Delete all cells of the same timestamp or older.
+   * @throws IOException 
+   */
+  public void deleteAll(final Text row, final Text column, final long ts)
+  throws IOException {
+    checkClosed();
+    for(int tries = 0; tries < numRetries; tries++) {
+      HRegionLocation r = getRegionLocation(row);
+      HRegionInterface server =
+        connection.getHRegionConnection(r.getServerAddress());
+      try {
+        server.deleteAll(r.getRegionInfo().getRegionName(), row, column, ts);
+        break;
+        
+      } catch (IOException e) {
+        if (e instanceof RemoteException) {
+          e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+        }
+        if (tries == numRetries - 1) {
+          throw e;
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("reloading table servers because: " + e.getMessage());
+        }
+        tableServers = connection.reloadTableServers(tableName);
+      }
+      try {
+        Thread.sleep(this.pause);
+      } catch (InterruptedException x) {
+        // continue
+      }
+    }
+  }
+  
+  /** 
    * Abort a row mutation
    *
-   * @param lockid              - lock id returned from startUpdate
+   * @param lockid lock id returned from startUpdate
    */
   public synchronized void abort(long lockid) {
     checkClosed();
@@ -558,24 +608,26 @@
   
   /** 
    * Finalize a row mutation
-   *
-   * @param lockid              - lock id returned from startUpdate
+   * When this method is specified, we pass the server a value that says use
+   * the 'latest' timestamp.  If we are doing a put, on the server-side, cells
+   * will be given the servers's current timestamp.  If the we are commiting
+   * deletes, then delete removes the most recently modified cell of stipulated
+   * column.
+   * @param lockid lock id returned from startUpdate
    * @throws IOException
    */
   public void commit(long lockid) throws IOException {
-    commit(lockid, System.currentTimeMillis());
+    commit(lockid, LATEST_TIMESTAMP);
   }
 
   /** 
    * Finalize a row mutation
-   *
-   * @param lockid              - lock id returned from startUpdate
-   * @param timestamp           - time to associate with the change
+   * @param lockid lock id returned from startUpdate
+   * @param timestamp time to associate with the change
    * @throws IOException
    */
   public synchronized void commit(long lockid, long timestamp)
   throws IOException {
-    
     checkClosed();
     updateInProgress(true);
     if (batch.get().getLockid() != lockid) {

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchOperation.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchOperation.java?rev=574287&r1=574286&r2=574287&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchOperation.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchOperation.java Mon Sep 10 08:56:16 2007
@@ -27,47 +27,53 @@
 import org.apache.hadoop.io.Writable;
 
 /**
- * batch update operation
+ * Batch update operations such as put, delete, and deleteAll.
  */
 public class BatchOperation implements Writable {
-  /** put operation */
-  public static final int PUT_OP = 1;
-  
-  /** delete operation */
-  public static final int DELETE_OP = 2;
-  
-  private int op;
+  /** 
+   * Operation types.
+   * @see org.apache.hadoop.io.SequenceFile.Writer
+   */
+  public static enum Operation {PUT, DELETE}
+
+  private Operation op;
   private Text column;
   private byte[] value;
   
   /** default constructor used by Writable */
   public BatchOperation() {
-    this.op = 0;
-    this.column = new Text();
-    this.value = null;
+    this(new Text());
   }
-  
   /**
-   * Creates a put operation
+   * Creates a DELETE operation
+   * 
+   * @param column column name
+   */
+  public BatchOperation(final Text column) {
+    this(Operation.DELETE, column, null);
+  }
+
+  /**
+   * Creates a PUT operation
    * 
    * @param column column name
    * @param value column value
    */
-  public BatchOperation(Text column, byte[] value) {
-    this.op = PUT_OP;
-    this.column = column;
-    this.value = value;
+  public BatchOperation(final Text column, final byte [] value) {
+    this(Operation.PUT, column, value);
   }
   
   /**
-   * Creates a delete operation
+   * Creates a put operation
    * 
-   * @param column name of column to delete
+   * @param column column name
+   * @param value column value
    */
-  public BatchOperation(Text column) {
-    this.op = DELETE_OP;
+  public BatchOperation(final Operation operation, final Text column,
+      final byte[] value) {
+    this.op = operation;
     this.column = column;
-    this.value = null;
+    this.value = value;
   }
 
   /**
@@ -80,8 +86,8 @@
   /**
    * @return the operation
    */
-  public int getOp() {
-    return op;
+  public Operation getOp() {
+    return this.op;
   }
 
   /**
@@ -99,9 +105,10 @@
    * {@inheritDoc}
    */
   public void readFields(DataInput in) throws IOException {
-    op = in.readInt();
+    int ordinal = in.readInt();
+    this.op = Operation.values()[ordinal];
     column.readFields(in);
-    if(op == PUT_OP) {
+    if (this.op == Operation.PUT) {
       value = new byte[in.readInt()];
       in.readFully(value);
     }
@@ -111,11 +118,11 @@
    * {@inheritDoc}
    */
   public void write(DataOutput out) throws IOException {
-    out.writeInt(op);
+    out.writeInt(this.op.ordinal());
     column.write(out);
-    if(op == PUT_OP) {
+    if (this.op == Operation.PUT) {
       out.writeInt(value.length);
       out.write(value);
     }
   }
-}
+}
\ No newline at end of file

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java?rev=574287&r1=574286&r2=574287&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java Mon Sep 10 08:56:16 2007
@@ -61,7 +61,7 @@
    */
   public BatchUpdate(long lockid) {
     this.row = new Text();
-    this.lockid = Long.valueOf(Math.abs(lockid));
+    this.lockid = Math.abs(lockid);
     this.operations = new ArrayList<BatchOperation>();
   }
 
@@ -97,27 +97,28 @@
   /** 
    * Change a value for the specified column
    *
-   * @param lockid              - lock id returned from startUpdate
-   * @param column              - column whose value is being set
-   * @param val                 - new value for column
+   * @param lid lock id returned from startUpdate
+   * @param column column whose value is being set
+   * @param val new value for column
    */
-  public synchronized void put(final long lockid, final Text column,
+  public synchronized void put(final long lid, final Text column,
       final byte val[]) {
-    if(this.lockid != lockid) {
-      throw new IllegalArgumentException("invalid lockid " + lockid);
+    if(this.lockid != lid) {
+      throw new IllegalArgumentException("invalid lockid " + lid);
     }
     operations.add(new BatchOperation(column, val));
   }
   
   /** 
    * Delete the value for a column
-   *
-   * @param lockid              - lock id returned from startUpdate
-   * @param column              - name of column whose value is to be deleted
+   * Deletes the cell whose row/column/commit-timestamp match those of the
+   * delete.
+   * @param lid lock id returned from startUpdate
+   * @param column name of column whose value is to be deleted
    */
-  public synchronized void delete(final long lockid, final Text column) {
-    if(this.lockid != lockid) {
-      throw new IllegalArgumentException("invalid lockid " + lockid);
+  public synchronized void delete(final long lid, final Text column) {
+    if(this.lockid != lid) {
+      throw new IllegalArgumentException("invalid lockid " + lid);
     }
     operations.add(new BatchOperation(column));
   }

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java?rev=574287&r1=574286&r2=574287&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java Mon Sep 10 08:56:16 2007
@@ -153,6 +153,9 @@
   /** {@inheritDoc} */
   @Override
   public boolean equals(Object right_obj) {
+    if (right_obj instanceof byte []) {
+      return compareTo((byte [])right_obj) == 0;
+    }
     if (right_obj instanceof ImmutableBytesWritable) {
       return compareTo(right_obj) == 0;
     }

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java?rev=574287&r1=574286&r2=574287&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java Mon Sep 10 08:56:16 2007
@@ -26,6 +26,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HColumnDescriptor.CompressionType;
 import org.apache.hadoop.io.Text;
 
 /**
@@ -41,6 +42,7 @@
   protected static final char LAST_CHAR = 'z';
   protected static final byte [] START_KEY_BYTES =
     {FIRST_CHAR, FIRST_CHAR, FIRST_CHAR};
+  protected static final int MAXVERSIONS = 3;
   
   static {
     StaticTestEnvironment.initialize();
@@ -100,10 +102,18 @@
   }
   
   protected HTableDescriptor createTableDescriptor(final String name) {
+    return createTableDescriptor(name, MAXVERSIONS);
+  }
+  
+  protected HTableDescriptor createTableDescriptor(final String name,
+      final int versions) {
     HTableDescriptor htd = new HTableDescriptor(name);
-    htd.addFamily(new HColumnDescriptor(COLFAMILY_NAME1));
-    htd.addFamily(new HColumnDescriptor(COLFAMILY_NAME2));
-    htd.addFamily(new HColumnDescriptor(COLFAMILY_NAME3));
+    htd.addFamily(new HColumnDescriptor(new Text(COLFAMILY_NAME1), versions,
+      CompressionType.NONE, false, Integer.MAX_VALUE, null));
+    htd.addFamily(new HColumnDescriptor(new Text(COLFAMILY_NAME2), versions,
+      CompressionType.NONE, false, Integer.MAX_VALUE, null));
+    htd.addFamily(new HColumnDescriptor(new Text(COLFAMILY_NAME3), versions,
+      CompressionType.NONE, false, Integer.MAX_VALUE, null));
     return htd;
   }
   
@@ -123,18 +133,18 @@
     if (startKeyBytes == null || startKeyBytes.length == 0) {
       startKeyBytes = START_KEY_BYTES;
     }
-    addContent(new HRegionLoader(r), column, startKeyBytes, endKey, -1);
+    addContent(new HRegionIncommon(r), column, startKeyBytes, endKey, -1);
   }
 
   /**
    * Add content to region <code>r</code> on the passed column
    * <code>column</code>.
    * Adds data of the from 'aaa', 'aab', etc where key and value are the same.
-   * @param updater  An instance of {@link Loader}.
+   * @param updater  An instance of {@link Incommon}.
    * @param column
    * @throws IOException
    */
-  protected static void addContent(final Loader updater, final String column)
+  protected static void addContent(final Incommon updater, final String column)
   throws IOException {
     addContent(updater, column, START_KEY_BYTES, null);
   }
@@ -143,13 +153,13 @@
    * Add content to region <code>r</code> on the passed column
    * <code>column</code>.
    * Adds data of the from 'aaa', 'aab', etc where key and value are the same.
-   * @param updater  An instance of {@link Loader}.
+   * @param updater  An instance of {@link Incommon}.
    * @param column
    * @param startKeyBytes Where to start the rows inserted
    * @param endKey Where to stop inserting rows.
    * @throws IOException
    */
-  protected static void addContent(final Loader updater, final String column,
+  protected static void addContent(final Incommon updater, final String column,
       final byte [] startKeyBytes, final Text endKey)
   throws IOException {
     addContent(updater, column, startKeyBytes, endKey, -1);
@@ -159,14 +169,14 @@
    * Add content to region <code>r</code> on the passed column
    * <code>column</code>.
    * Adds data of the from 'aaa', 'aab', etc where key and value are the same.
-   * @param updater  An instance of {@link Loader}.
+   * @param updater  An instance of {@link Incommon}.
    * @param column
    * @param startKeyBytes Where to start the rows inserted
    * @param endKey Where to stop inserting rows.
    * @param ts Timestamp to write the content with.
    * @throws IOException
    */
-  protected static void addContent(final Loader updater, final String column,
+  protected static void addContent(final Incommon updater, final String column,
       final byte [] startKeyBytes, final Text endKey, final long ts)
   throws IOException {
     // Add rows of three characters.  The first character starts with the
@@ -207,23 +217,42 @@
   }
   
   /**
-   * Interface used by the addContent methods so either a HTable or a HRegion
-   * can be passed to the methods.
+   * Implementors can flushcache.
+   */
+  public static interface FlushCache {
+    public void flushcache() throws IOException;
+  }
+  
+  /**
+   * Interface used by tests so can do common operations against an HTable
+   * or an HRegion.
+   * 
+   * TOOD: Come up w/ a better name for this interface.
    */
-  public static interface Loader {
+  public static interface Incommon {
+    public byte [] get(Text row, Text column) throws IOException;
+    public byte [][] get(Text row, Text column, int versions)
+    throws IOException;
+    public byte [][] get(Text row, Text column, long ts, int versions)
+    throws IOException;
     public long startBatchUpdate(final Text row) throws IOException;
     public void put(long lockid, Text column, byte val[]) throws IOException;
+    public void delete(long lockid, Text column) throws IOException;
+    public void deleteAll(Text row, Text column, long ts) throws IOException;
     public void commit(long lockid) throws IOException;
     public void commit(long lockid, long ts) throws IOException;
     public void abort(long lockid) throws IOException;
+    public HScannerInterface getScanner(Text [] columns, Text firstRow,
+      long ts)
+    throws IOException;
   }
   
   /**
-   * A class that makes a {@link Loader} out of a {@link HRegion}
+   * A class that makes a {@link Incommon} out of a {@link HRegion}
    */
-  public static class HRegionLoader implements Loader {
+  public static class HRegionIncommon implements Incommon {
     final HRegion region;
-    public HRegionLoader(final HRegion HRegion) {
+    public HRegionIncommon(final HRegion HRegion) {
       super();
       this.region = HRegion;
     }
@@ -231,7 +260,7 @@
       this.region.abort(lockid);
     }
     public void commit(long lockid) throws IOException {
-      this.region.commit(lockid, System.currentTimeMillis());
+      this.region.commit(lockid);
     }
     public void commit(long lockid, final long ts) throws IOException {
       this.region.commit(lockid, ts);
@@ -239,17 +268,38 @@
     public void put(long lockid, Text column, byte[] val) throws IOException {
       this.region.put(lockid, column, val);
     }
+    public void delete(long lockid, Text column) throws IOException {
+      this.region.delete(lockid, column);
+    }
+    public void deleteAll(Text row, Text column, long ts) throws IOException {
+      this.region.deleteAll(row, column, ts);
+    }
     public long startBatchUpdate(Text row) throws IOException {
       return this.region.startUpdate(row);
     }
+    public HScannerInterface getScanner(Text [] columns, Text firstRow,
+        long ts)
+    throws IOException {
+      return this.region.getScanner(columns, firstRow, ts, null);
+    }
+    public byte[] get(Text row, Text column) throws IOException {
+      return this.region.get(row, column);
+    }
+    public byte[][] get(Text row, Text column, int versions) throws IOException {
+      return this.region.get(row, column, versions);
+    }
+    public byte[][] get(Text row, Text column, long ts, int versions)
+        throws IOException {
+      return this.region.get(row, column, ts, versions);
+    }
   }
 
   /**
-   * A class that makes a {@link Loader} out of a {@link HTable}
+   * A class that makes a {@link Incommon} out of a {@link HTable}
    */
-  public static class HTableLoader implements Loader {
+  public static class HTableIncommon implements Incommon {
     final HTable table;
-    public HTableLoader(final HTable table) {
+    public HTableIncommon(final HTable table) {
       super();
       this.table = table;
     }
@@ -265,8 +315,30 @@
     public void put(long lockid, Text column, byte[] val) throws IOException {
       this.table.put(lockid, column, val);
     }
+    public void delete(long lockid, Text column) throws IOException {
+      this.table.delete(lockid, column);
+    }
+    public void deleteAll(Text row, Text column, long ts) throws IOException {
+      this.table.deleteAll(row, column, ts);
+    }
     public long startBatchUpdate(Text row) {
       return this.table.startUpdate(row);
+    }
+    public HScannerInterface getScanner(Text [] columns, Text firstRow,
+        long ts)
+    throws IOException {
+      return this.table.obtainScanner(columns, firstRow, ts, null);
+    }
+    public byte[] get(Text row, Text column) throws IOException {
+      return this.table.get(row, column);
+    }
+    public byte[][] get(Text row, Text column, int versions)
+    throws IOException {
+      return this.table.get(row, column, versions);
+    }
+    public byte[][] get(Text row, Text column, long ts, int versions)
+    throws IOException {
+      return this.table.get(row, column, ts, versions);
     }
   }
 }

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java?rev=574287&r1=574286&r2=574287&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java Mon Sep 10 08:56:16 2007
@@ -417,4 +417,15 @@
     }
     f.delete();
   }
+  
+  /**
+   * Call flushCache on all regions on all participating regionservers.
+   * @throws IOException
+   */
+  void flushcache() throws IOException {
+    HRegionServer s = this.regionThreads.get(0).getRegionServer();
+    for(HRegion r: s.onlineRegions.values() ) {
+      r.flushcache(false);
+    }
+  }
 }

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MultiRegionTable.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MultiRegionTable.java?rev=574287&r1=574286&r2=574287&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MultiRegionTable.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MultiRegionTable.java Mon Sep 10 08:56:16 2007
@@ -54,7 +54,7 @@
     HTable meta = new HTable(conf, HConstants.META_TABLE_NAME);
     int count = count(meta, HConstants.COLUMN_FAMILY_STR);
     HTable t = new HTable(conf, new Text(tableName));
-    addContent(new HTableLoader(t), columnName);
+    addContent(new HTableIncommon(t), columnName);
     
     // All is running in the one JVM so I should be able to get the single
     // region instance and bring on a split.

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompaction.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompaction.java?rev=574287&r1=574286&r2=574287&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompaction.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompaction.java Mon Sep 10 08:56:16 2007
@@ -23,71 +23,133 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.Text;
 
 /**
  * Test compactions
  */
 public class TestCompaction extends HBaseTestCase {
   static final Log LOG = LogFactory.getLog(TestCompaction.class.getName());
-
+  private HLog hlog = null;
+  private HRegion r = null;
+  private static final String COLUMN_FAMILY = COLFAMILY_NAME1;
+  private static final Text STARTROW = new Text(START_KEY_BYTES);
+  private static final Text COLUMN_FAMILY_TEXT = new Text(COLUMN_FAMILY);
+  private static final Text COLUMN_FAMILY_TEXT_MINUS_COLON =
+    new Text(COLUMN_FAMILY.substring(0, COLUMN_FAMILY.length() - 1));
+  private static final int COMPACTION_THRESHOLD = MAXVERSIONS;
+  
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    this.hlog = new HLog(this.localFs, this.testDir, this.conf);
+    HTableDescriptor htd = createTableDescriptor(getName());
+    HRegionInfo hri = new HRegionInfo(1, htd, null, null);
+    this.r = new HRegion(testDir, hlog, this.localFs, this.conf, hri, null);
+  }
+  
+  @Override
+  public void tearDown() throws Exception {
+    this.r.close();
+    this.hlog.closeAndDelete();
+    super.tearDown();
+  }
+  
   /**
    * Run compaction and flushing memcache
+   * Assert deletes get cleaned up.
    * @throws Exception
    */
   public void testCompaction() throws Exception {
-    HLog hlog = new HLog(this.localFs, this.testDir, this.conf);
-    HTableDescriptor htd = createTableDescriptor(getName());
-    HRegionInfo hri = new HRegionInfo(1, htd, null, null);
-    final HRegion r =
-      new HRegion(testDir, hlog, this.localFs, this.conf, hri, null);
-    try {
+    createStoreFile(r);
+    assertFalse(r.needsCompaction());
+    for (int i = 0; i < COMPACTION_THRESHOLD; i++) {
       createStoreFile(r);
-      assertFalse(r.needsCompaction());
-      int compactionThreshold =
-        this.conf.getInt("hbase.hstore.compactionThreshold", 3);
-      for (int i = 0; i < compactionThreshold; i++) {
-        createStoreFile(r);
-      }
-      assertTrue(r.needsCompaction());
-      // Try to run compaction concurrent with a thread flush.
-      addContent(new HRegionLoader(r), COLFAMILY_NAME1);
-      Thread t1 = new Thread() {
-        @Override
-        public void run() {
-          try {
-            r.flushcache(false);
-          } catch (IOException e) {
-            e.printStackTrace();
-          }
+    }
+    assertTrue(r.needsCompaction());
+    // Add more content.  Now there are about 5 versions of each column.
+    // Default is that there only 3 (MAXVERSIONS) versions allowed per column.
+    // Assert > 3 and then after compaction, assert that only 3 versions
+    // available.
+    addContent(new HRegionIncommon(r), COLUMN_FAMILY);
+    byte [][] bytes = this.r.get(STARTROW, COLUMN_FAMILY_TEXT, 100 /*Too many*/);
+    // Assert that I can get > 5 versions (Should be at least 5 in there).
+    assertTrue(bytes.length >= 5);
+    // Try to run compaction concurrent with a thread flush just to see that
+    // we can.
+    final HRegion region = this.r;
+    Thread t1 = new Thread() {
+      @Override
+      public void run() {
+        try {
+          region.flushcache(false);
+        } catch (IOException e) {
+          e.printStackTrace();
         }
-      };
-      Thread t2 = new Thread() {
-        @Override
-        public void run() {
-          try {
-            assertTrue(r.compactStores());
-          } catch (IOException e) {
-            e.printStackTrace();
-          }
+      }
+    };
+    Thread t2 = new Thread() {
+      @Override
+      public void run() {
+        try {
+          assertTrue(region.compactStores());
+        } catch (IOException e) {
+          e.printStackTrace();
         }
-      };
-      t1.setDaemon(true);
-      t1.start();
-      t2.setDaemon(true);
-      t2.start();
-      t1.join();
-      t2.join();
-    } finally {
-      r.close();
-      hlog.closeAndDelete();
+      }
+    };
+    t1.setDaemon(true);
+    t1.start();
+    t2.setDaemon(true);
+    t2.start();
+    t1.join();
+    t2.join();
+    // Now assert that there are 4 versions of a record only: thats the
+    // 3 versions that should be in the compacted store and then the one more
+    // we added when we compacted.
+    byte [] secondRowBytes = new byte[START_KEY_BYTES.length];
+    System.arraycopy(START_KEY_BYTES, 0, secondRowBytes, 0,
+      START_KEY_BYTES.length);
+    // Increment the least significant character so we get to next row.
+    secondRowBytes[START_KEY_BYTES.length - 1]++;
+    Text secondRow = new Text(secondRowBytes);
+    bytes = this.r.get(secondRow, COLUMN_FAMILY_TEXT, 100/*Too many*/);
+    assertTrue(bytes.length == 4);
+    // Now add deletes to memcache and then flush it.  That will put us over
+    // the compaction threshold of 3 store files.  Compacting these store files
+    // should result in a compacted store file that has no references to the
+    // deleted row.
+    this.r.deleteAll(STARTROW, COLUMN_FAMILY_TEXT, System.currentTimeMillis());
+    // Now, before compacting, remove all instances of the first row so can
+    // verify that it is removed as we compact.
+    // Assert all delted.
+    assertNull(this.r.get(STARTROW, COLUMN_FAMILY_TEXT, 100 /*Too many*/));
+    this.r.flushcache(false);
+    assertNull(this.r.get(STARTROW, COLUMN_FAMILY_TEXT, 100 /*Too many*/));
+    assertTrue(this.r.needsCompaction());
+    this.r.compactStores();
+    // Assert that the first row is still deleted.
+    bytes = this.r.get(STARTROW, COLUMN_FAMILY_TEXT, 100 /*Too many*/);
+    assertNull(bytes);
+    // Assert the store files do not have the first record 'aaa' keys in them.
+    for (MapFile.Reader reader:
+        this.r.stores.get(COLUMN_FAMILY_TEXT_MINUS_COLON).readers.values()) {
+      reader.reset();
+      HStoreKey key = new HStoreKey();
+      ImmutableBytesWritable val = new ImmutableBytesWritable();
+      while(reader.next(key, val)) {
+        assertFalse(key.getRow().equals(STARTROW));
+      }
     }
   }
-  
-  private void createStoreFile(final HRegion r) throws IOException {
-    HRegionLoader loader = new HRegionLoader(r);
-    for (int i = 0; i < 3; i++) {
-      addContent(loader, COLFAMILY_NAME1);
+
+  private void createStoreFile(final HRegion region) throws IOException {
+    HRegionIncommon loader = new HRegionIncommon(region);
+    for (int i = 0; i < 1; i++) {
+      addContent(loader, COLUMN_FAMILY);
     }
-    r.flushcache(false);
+    region.flushcache(false);
   }
-}
\ No newline at end of file
+}

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestMasterAdmin.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestMasterAdmin.java?rev=574287&r1=574286&r2=574287&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestMasterAdmin.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestMasterAdmin.java Mon Sep 10 08:56:16 2007
@@ -45,32 +45,27 @@
     admin.disableTable(testDesc.getName());
 
     try {
-      try {
-        @SuppressWarnings("unused")
-        HTable table = new HTable(conf, testDesc.getName());
+      @SuppressWarnings("unused")
+      HTable table = new HTable(conf, testDesc.getName());
 
-      } catch(IllegalStateException e) {
-        // Expected
-      }
-
-      admin.addColumn(testDesc.getName(), new HColumnDescriptor("col2:"));
-      admin.enableTable(testDesc.getName());
-      try {
-        admin.deleteColumn(testDesc.getName(), new Text("col2:"));
-        
-      } catch(TableNotDisabledException e) {
-        // Expected
-      }
+    } catch(IllegalStateException e) {
+      // Expected
+      
+      // This exception is not actually thrown.  It doesn't look like it should
+      // thrown since the connection manager is already filled w/ data
+      // -- noticed by St.Ack 09/09/2007
+    }
 
-      admin.disableTable(testDesc.getName());
+    admin.addColumn(testDesc.getName(), new HColumnDescriptor("col2:"));
+    admin.enableTable(testDesc.getName());
+    try {
       admin.deleteColumn(testDesc.getName(), new Text("col2:"));
-      
-    } catch(Exception e) {
-      e.printStackTrace();
-      fail();
-      
-    } finally {
-      admin.deleteTable(testDesc.getName());
+    } catch(TableNotDisabledException e) {
+      // Expected
     }
+
+    admin.disableTable(testDesc.getName());
+    admin.deleteColumn(testDesc.getName(), new Text("col2:"));
+    admin.deleteTable(testDesc.getName());
   }
 }