You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2007/08/16 03:07:53 UTC

svn commit: r566459 [1/2] - in /lucene/hadoop/trunk/src/contrib/hbase: ./ conf/ src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/io/ src/java/org/apache/hadoop/hbase/util/ src/test/org/apache/hadoop/hbase/

Author: stack
Date: Wed Aug 15 18:07:51 2007
New Revision: 566459

URL: http://svn.apache.org/viewvc?view=rev&rev=566459
Log:
HADOOP-1644 [hbase] Compactions should not block updates

Disentangles flushes and compactions; flushes can proceed while a
compaction is happening.  Also, don't compact unless we hit
compaction threshold: i.e. don't automatically compact on HRegion
startup so regions can come online the faster.

M src/contrib/hbase/conf/hbase-default.xml
    (hbase.hregion.compactionThreashold): Moved to be a hstore property
    as part of encapsulating compaction decision inside hstore.
M src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java
    Refactored.  Moved here generalized content loading code that can
    be shared by tests.  Add to setup and teardown the setup and removal
    of local test dir (if it exists).
M src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompare.java
    Added test of HStoreKey compare (It works other than one would at
    first expect).
M src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java
    Bulk of content loading code has been moved up into the parent class.
M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConnectionManager.java
    (tableExists): Restore to a check of if the asked-for table is in list of
    tables.  As it was, a check for tableExists would just wait on all timeouts
    and retries to expire and then report table does not exist..  Fixed up
    debug message listing regions of a table.  Added protection against meta
    table not having a COL_REGINFO (Seen in cluster testing -- probably a bug
    in row removal).
M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreFile.java
    Loading store files, even if it was noticed that there was no corresponding
    map file, was still counting file as valid.  Also fix merger -- was
    constructing MapFile.Reader directly rather than asking HStoreFile for
    the reader (HStoreFile knows how to do MapFile references)
    (rename): Added check that move succeeded and logging.  In cluster-testing,
    the hdfs move of compacted file into place has failed on occasion (Need
    more info).
M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java
    Encapsulate ruling on whether a compaction should take place inside HStore.
    Added reading of the compactionThreshold her.  Compaction threshold is
    currently just number of store files.  Later may include other factors such
    as count of reference files.  Cleaned up debug messages around
    reconstruction log.  Removed compaction if size > 1 from constructor.  Let
    compaction happen after we've been deployed (Compactions that happen while
    we are online can continue to take updates.  Compaction in the constructor
    puts off our being able to take in updates).
    (close): Changed so it now returns set of store files.  This used to be done
    by calls to flush. Since flush and compaction have been disentangled, a
    compaction can come in after flush and the list of files could be off.
    Having it done by close, can be sure list of files is complete.
    (flushCache): No longer returns set of store files.  Added 'merging compaction'
    where we pick an arbitrary store file from disk and merge into it the content
    of memcache (Needs work).
    (getAllMapFiles): Renamed getAllStoreFiles.
    (needsCompaction): Added.
    (compactHelper): Added passing of maximum sequence number if already
    calculated. If compacting one file only, we used skip without rewriting
    the info file.  Fixed.
    Refactored.  Moved guts to new  compact(outFile, listOfStores)  method.
    (compact, CompactionReader): Added overrides and interface  to support
    'merging compaction' that takes files and memcache.  In compaction,
    if we failed the move of the compacted file, all data had already been
    deleted.  Changing, so deletion happens after confirmed move of
    compacted file.
    (getFull): Fixed bug where NPE when read of maps came back null.
    Revealed by our NOT compacting stores on startup.  Meant could be two
    backing stores one of which had no data regards queried key.
    (getNMaps): Renamed countOfStoreFiles.
    (toString): Added.
M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java
    Added comment on 'odd'-looking comparison.
M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
    Javadoc edit. 
M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLogEdit.java
    Only return first 128 bytes of value when toStringing (On cluster,
    was returning complete web pages in log).
M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
    Removed confusing debug message (made sense once -- but not now).
    Test rootRegionLocation for null before using it (can be null).
M  src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java
    Added comment that delete behavior needs study.
M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
    Fixed merge so it doesn't do the incremental based off files
    returned by flush.  Instead all is done in the one go after
    region closes (using files returned by close).
    Moved duplicated code to new filesByFamily method.
    (WriteState): Removed writesOngoing in favor of compacting and
    flushing flags.
    (flushCache): No longer returns list of files.
M src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/Writables.java
    Fix javadoc.

Added:
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompaction.java
Modified:
    lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/hbase/conf/hbase-default.xml
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConnection.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConnectionManager.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLogEdit.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreFile.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/Writables.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompare.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java

Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?view=diff&rev=566459&r1=566458&r2=566459
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Wed Aug 15 18:07:51 2007
@@ -95,3 +95,4 @@
  58. HADOOP-1710 All updates should be batch updates
  59. HADOOP-1711 HTable API should use interfaces instead of concrete classes as
      method parameters and return values
+ 60. HADOOP-1644 Compactions should not block updates

Modified: lucene/hadoop/trunk/src/contrib/hbase/conf/hbase-default.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/conf/hbase-default.xml?view=diff&rev=566459&r1=566458&r2=566459
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/conf/hbase-default.xml (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/conf/hbase-default.xml Wed Aug 15 18:07:51 2007
@@ -147,7 +147,7 @@
     </description>
   </property>
   <property>
-    <name>hbase.hregion.compactionThreshold</name>
+    <name>hbase.hstore.compactionThreshold</name>
     <value>3</value>
     <description>
     If more than this number of HStoreFiles in any one HStore

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConnection.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConnection.java?view=diff&rev=566459&r1=566458&r2=566459
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConnection.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConnection.java Wed Aug 15 18:07:51 2007
@@ -38,6 +38,7 @@
   public boolean isMasterRunning();
   
   /**
+   * Checks if <code>tableName</code> exists.
    * @param tableName Table to check.
    * @return True if table exists already.
    */

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConnectionManager.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConnectionManager.java?view=diff&rev=566459&r1=566458&r2=566459
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConnectionManager.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConnectionManager.java Wed Aug 15 18:07:51 2007
@@ -44,7 +44,7 @@
  * multiple HBase instances
  */
 public class HConnectionManager implements HConstants {
-  private HConnectionManager(){}                        // Not instantiable
+  private HConnectionManager() {}                        // Not instantiable
   
   // A Map of master HServerAddress -> connection information for that instance
   // Note that although the Map is synchronized, the objects it contains
@@ -209,15 +209,19 @@
 
     /** {@inheritDoc} */
     public boolean tableExists(final Text tableName) {
-      boolean exists = true;
+      if (tableName == null) {
+        throw new IllegalArgumentException("Table name cannot be null");
+      }
+      boolean exists = false;
       try {
-        SortedMap<Text, HRegionLocation> servers = getTableServers(tableName);
-        if (servers == null || servers.size() == 0) {
-          exists = false;
+        HTableDescriptor[] tables = listTables();
+        for (int i = 0; i < tables.length; i++) {
+          if (tables[i].getName().equals(tableName)) {
+            exists = true;
+          }
         }
-
       } catch (IOException e) {
-        exists = false;
+        LOG.warn("Testing for table existence threw exception", e);
       }
       return exists;
     }
@@ -400,7 +404,6 @@
     throws IOException {
       
       // Wipe out everything we know about this table
-
       if (this.tablesToServers.remove(tableName) != null) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Wiping out all we know of " + tableName);
@@ -524,9 +527,10 @@
       }
       this.tablesToServers.put(tableName, servers);
       if (LOG.isDebugEnabled()) {
+        int count = 0;
         for (Map.Entry<Text, HRegionLocation> e: servers.entrySet()) {
-          LOG.debug("Server " + e.getKey() + " is serving: " + e.getValue() +
-              " for table " + tableName);
+          LOG.debug("Region " + (1 + count++) + " of " + servers.size() +
+            ": " + e.getValue());
         }
       }
       return servers;
@@ -650,40 +654,47 @@
         new TreeMap<Text, HRegionLocation>();
       
       for (int tries = 0; servers.size() == 0 && tries < numRetries; tries++) {
-
         long scannerId = -1L;
         try {
-          scannerId =
-            server.openScanner(t.getRegionInfo().getRegionName(),
-                COLUMN_FAMILY_ARRAY, tableName, System.currentTimeMillis(), null);
+          scannerId = server.openScanner(t.getRegionInfo().getRegionName(),
+            COLUMN_FAMILY_ARRAY, tableName, System.currentTimeMillis(), null);
 
           while (true) {
-            HRegionInfo regionInfo = null;
-            String serverAddress = null;
             KeyedData[] values = server.next(scannerId);
             if (values.length == 0) {
               if (servers.size() == 0) {
                 // If we didn't find any servers then the table does not exist
                 throw new TableNotFoundException("table '" + tableName +
-                    "' does not exist in " + t);
+                  "' does not exist in " + t);
               }
 
               // We found at least one server for the table and now we're done.
               if (LOG.isDebugEnabled()) {
                 LOG.debug("Found " + servers.size() + " server(s) for " +
-                    "location: " + t + " for tablename " + tableName);
+                  tableName + " at " + t);
               }
               break;
             }
 
-            byte[] bytes = null;
             TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
             for (int i = 0; i < values.length; i++) {
               results.put(values[i].getKey().getColumn(), values[i].getData());
             }
-            regionInfo = new HRegionInfo();
-            regionInfo = (HRegionInfo) Writables.getWritable(
-                results.get(COL_REGIONINFO), regionInfo);
+            
+            byte[] bytes = results.get(COL_REGIONINFO);
+            if (bytes == null || bytes.length == 0) {
+              // This can be null.  Looks like an info:splitA or info:splitB
+              // is only item in the row.
+              if (LOG.isDebugEnabled()) {
+                LOG.debug(COL_REGIONINFO.toString() + " came back empty: " +
+                  results.toString());
+              }
+              servers.clear();
+              break;
+            }
+            
+            HRegionInfo regionInfo = (HRegionInfo) Writables.getWritable(
+              results.get(COL_REGIONINFO), new HRegionInfo());
 
             if (!regionInfo.tableDesc.getName().equals(tableName)) {
               // We're done
@@ -707,7 +718,8 @@
               servers.clear();
               break;
             }
-            serverAddress = Writables.bytesToString(bytes);
+            
+            String serverAddress = Writables.bytesToString(bytes);
             servers.put(regionInfo.startKey, new HRegionLocation(
                 regionInfo, new HServerAddress(serverAddress)));
           }

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLogEdit.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLogEdit.java?view=diff&rev=566459&r1=566458&r2=566459
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLogEdit.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLogEdit.java Wed Aug 15 18:07:51 2007
@@ -34,6 +34,7 @@
   private Text column = new Text();
   private byte [] val;
   private long timestamp;
+  private final int MAX_VALUE_LEN = 128;
 
   /**
    * Default constructor used by Writable
@@ -69,17 +70,23 @@
     return this.timestamp;
   }
 
-  /** {@inheritDoc} */
+  /**
+   * @return First column name, timestamp, and first 128 bytes of the value
+   * bytes as a String.
+   */
   @Override
   public String toString() {
     String value = "";
     try {
-      value = new String(getVal(), HConstants.UTF8_ENCODING);
-      
+      value = (this.val.length > MAX_VALUE_LEN)?
+        new String(this.val, 0, MAX_VALUE_LEN, HConstants.UTF8_ENCODING) +
+          "...":
+        new String(getVal(), HConstants.UTF8_ENCODING);
     } catch (UnsupportedEncodingException e) {
       throw new RuntimeException("UTF8 encoding not present?", e);
     }
-    return "(" + getColumn().toString() + "/" + getTimestamp() + "/" + value + ")";
+    return "(" + getColumn().toString() + "/" + getTimestamp() + "/" +
+      value + ")";
   }
   
   // Writable
@@ -99,4 +106,4 @@
     in.readFully(this.val);
     this.timestamp = in.readLong();
   }
-}
\ No newline at end of file
+}

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java?view=diff&rev=566459&r1=566458&r2=566459
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java Wed Aug 15 18:07:51 2007
@@ -312,18 +312,16 @@
       boolean noReferencesB = splitB == null;
       
       if (!noReferencesA) {
-        noReferencesA =
-          hasReferences(metaRegionName, server, info.getRegionName(), splitA, COL_SPLITA);
+        noReferencesA = hasReferences(metaRegionName, server,
+          info.getRegionName(), splitA, COL_SPLITA);
       }
       if (!noReferencesB) {
-        noReferencesB =
-          hasReferences(metaRegionName, server, info.getRegionName(), splitB, COL_SPLITB);
+        noReferencesB = hasReferences(metaRegionName, server,
+          info.getRegionName(), splitB, COL_SPLITB);
       }
-      if (!(noReferencesA && noReferencesB)) {
-        
+      if (!noReferencesA && !noReferencesB) {
         // No references.  Remove this item from table and deleted region on
         // disk.
-        
         LOG.info("Deleting region " + info.getRegionName() +
         " because daughter splits no longer hold references");
         
@@ -337,7 +335,6 @@
         b.delete(lockid, COL_SERVER);
         b.delete(lockid, COL_STARTCODE);
         server.batchUpdate(metaRegionName, System.currentTimeMillis(), b);
-        
         result = true;
       }
       
@@ -361,8 +358,8 @@
         
         Path [] ps = fs.listPaths(p,
             new PathFilter () {
-              public boolean accept(Path path) {
-                return HStoreFile.isReference(path);
+              public boolean accept(Path p) {
+                return HStoreFile.isReference(p);
               }
             }
         );
@@ -394,18 +391,11 @@
         final String serverName, final long startCode) {
       
       // Skip region - if ...
-      
-      if(info.offLine                                       // offline
-          || killedRegions.contains(info.regionName)        // queued for offline
-          || regionsToDelete.contains(info.regionName)) {   // queued for delete
-
+      if(info.offLine                                     // offline
+          || killedRegions.contains(info.regionName)      // queued for offline
+          || regionsToDelete.contains(info.regionName)) { // queued for delete
         unassignedRegions.remove(info.regionName);
         assignAttempts.remove(info.regionName);
-
-        if(LOG.isDebugEnabled()) {
-          LOG.debug("not assigning region: " + info.regionName + " (offline: " +
-              info.isOffline() + ", split: " + info.isSplit() + ")");
-        }
         return;
       }
 
@@ -416,7 +406,6 @@
             regionsToKill.containsKey(info.regionName)) {
           
           // Skip if region is on kill list
-
           if(LOG.isDebugEnabled()) {
             LOG.debug("not assigning region (on kill list): " + info.regionName);
           }
@@ -431,14 +420,8 @@
           && (storedInfo == null || storedInfo.getStartCode() != startCode)) {
         
         // The current assignment is no good; load the region.
-
         unassignedRegions.put(info.regionName, info);
         assignAttempts.put(info.regionName, Long.valueOf(0L));
-      
-      } else if (LOG.isDebugEnabled()) {
-        LOG.debug("Finished if " + info.getRegionName() + " is assigned: " +
-            "unassigned: " + unassignedRegions.containsKey(info.regionName) +
-            ", pending: " + pendingRegions.contains(info.regionName));
       }
     }
   }
@@ -2155,8 +2138,10 @@
           if (rootRegionLocation.get() == null || !rootScanned) {
             // We can't proceed until the root region is online and has been scanned
             if (LOG.isDebugEnabled()) {
-              LOG.debug("root region=" + rootRegionLocation.get().toString() +
-                  ", rootScanned=" + rootScanned);
+              LOG.debug("root region: " + 
+                ((rootRegionLocation != null)?
+                  rootRegionLocation.toString(): "null") +
+                ", rootScanned: " + rootScanned);
             }
             return false;
           }

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java?view=diff&rev=566459&r1=566458&r2=566459
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java Wed Aug 15 18:07:51 2007
@@ -243,6 +243,7 @@
    *
    * TODO - This is kinda slow.  We need a data structure that allows for 
    * proximity-searches, not just precise-matches.
+   * 
    * @param map
    * @param key
    * @param numVersions
@@ -251,13 +252,19 @@
   ArrayList<byte []> get(final TreeMap<HStoreKey, byte []> map,
       final HStoreKey key, final int numVersions) {
     ArrayList<byte []> result = new ArrayList<byte []>();
-    HStoreKey curKey =
-      new HStoreKey(key.getRow(), key.getColumn(), key.getTimestamp());
+    // TODO: If get is of a particular version -- numVersions == 1 -- we
+    // should be able to avoid all of the tailmap creations and iterations
+    // below.
+    HStoreKey curKey = new HStoreKey(key);
     SortedMap<HStoreKey, byte []> tailMap = map.tailMap(curKey);
     for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) {
       HStoreKey itKey = es.getKey();
       if (itKey.matchesRowCol(curKey)) {
         if(HConstants.DELETE_BYTES.compareTo(es.getValue()) == 0) {
+          // TODO: Shouldn't this be a continue rather than a break?  Perhaps
+          // the intent is that this DELETE_BYTES is meant to suppress older
+          // info -- see 5.4 Compactions in BigTable -- but how does this jibe
+          // with being able to remove one version only?
           break;
         }
         result.add(tailMap.get(itKey));

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java?view=diff&rev=566459&r1=566458&r2=566459
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java Wed Aug 15 18:07:51 2007
@@ -92,15 +92,13 @@
     // Make sure that srcA comes first; important for key-ordering during
     // write of the merged file.
     FileSystem fs = srcA.getFilesystem();
-    if(srcA.getStartKey() == null) {
-      if(srcB.getStartKey() == null) {
+    if (srcA.getStartKey() == null) {
+      if (srcB.getStartKey() == null) {
         throw new IOException("Cannot merge two regions with null start key");
       }
       // A's start key is null but B's isn't. Assume A comes before B
-      
-    } else if((srcB.getStartKey() == null)         // A is not null but B is
+    } else if ((srcB.getStartKey() == null)         // A is not null but B is
         || (srcA.getStartKey().compareTo(srcB.getStartKey()) > 0)) { // A > B
-      
       a = srcB;
       b = srcA;
     }
@@ -113,10 +111,8 @@
     HTableDescriptor tabledesc = a.getTableDesc();
     HLog log = a.getLog();
     Path rootDir = a.getRootDir();
-
     Text startKey = a.getStartKey();
     Text endKey = b.getEndKey();
-
     Path merges = new Path(a.getRegionDir(), MERGEDIR);
     if(! fs.exists(merges)) {
       fs.mkdirs(merges);
@@ -124,95 +120,20 @@
     
     HRegionInfo newRegionInfo
       = new HRegionInfo(Math.abs(rand.nextLong()), tabledesc, startKey, endKey);
-    
     Path newRegionDir = HRegion.getRegionDir(merges, newRegionInfo.regionName);
-
     if(fs.exists(newRegionDir)) {
-      throw new IOException("Cannot merge; target file collision at " + newRegionDir);
+      throw new IOException("Cannot merge; target file collision at " +
+        newRegionDir);
     }
 
     LOG.info("starting merge of regions: " + a.getRegionName() + " and " +
       b.getRegionName() + " into new region " + newRegionInfo.toString());
-    
-    // Flush each of the sources, and merge their files into a single 
-    // target for each column family.    
-    TreeSet<HStoreFile> alreadyMerged = new TreeSet<HStoreFile>();
-    TreeMap<Text, Vector<HStoreFile>> filesToMerge =
-      new TreeMap<Text, Vector<HStoreFile>>();
-    
-    for(HStoreFile src: a.flushcache(true)) {
-      Vector<HStoreFile> v = filesToMerge.get(src.getColFamily());
-      if(v == null) {
-        v = new Vector<HStoreFile>();
-        filesToMerge.put(src.getColFamily(), v);
-      }
-      v.add(src);
-    }
-    
-    for(HStoreFile src: b.flushcache(true)) {
-      Vector<HStoreFile> v = filesToMerge.get(src.getColFamily());
-      if(v == null) {
-        v = new Vector<HStoreFile>();
-        filesToMerge.put(src.getColFamily(), v);
-      }
-      v.add(src);
-    }
-    
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("merging stores");
-    }
-    
-    for (Map.Entry<Text, Vector<HStoreFile>> es: filesToMerge.entrySet()) {
-      Text colFamily = es.getKey();
-      Vector<HStoreFile> srcFiles = es.getValue();
-      HStoreFile dst = new HStoreFile(conf, merges, newRegionInfo.regionName, 
-        colFamily, Math.abs(rand.nextLong()));
-      dst.mergeStoreFiles(srcFiles, fs, conf);
-      alreadyMerged.addAll(srcFiles);
-    }
 
-    // That should have taken care of the bulk of the data.
-    // Now close the source HRegions for good, and repeat the above to take care
-    // of any last-minute inserts
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("flushing changes since start of merge for region " 
-          + a.getRegionName());
-    }
-
-    filesToMerge.clear();
-    
-    for(HStoreFile src: a.close()) {
-      if(! alreadyMerged.contains(src)) {
-        Vector<HStoreFile> v = filesToMerge.get(src.getColFamily());
-        if(v == null) {
-          v = new Vector<HStoreFile>();
-          filesToMerge.put(src.getColFamily(), v);
-        }
-        v.add(src);
-      }
-    }
-    
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("flushing changes since start of merge for region " 
-          + b.getRegionName());
-    }
-    
-    for(HStoreFile src: b.close()) {
-      if(! alreadyMerged.contains(src)) {
-        Vector<HStoreFile> v = filesToMerge.get(src.getColFamily());
-        if(v == null) {
-          v = new Vector<HStoreFile>();
-          filesToMerge.put(src.getColFamily(), v);
-        }
-        v.add(src);
-      }
-    }
-    
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("merging changes since start of merge");
-    }
-    
-    for (Map.Entry<Text, Vector<HStoreFile>> es : filesToMerge.entrySet()) {
+    Map<Text, Vector<HStoreFile>> byFamily =
+      new TreeMap<Text, Vector<HStoreFile>>();
+    byFamily = filesByFamily(byFamily, a.close());
+    byFamily = filesByFamily(byFamily, b.close());
+    for (Map.Entry<Text, Vector<HStoreFile>> es : byFamily.entrySet()) {
       Text colFamily = es.getKey();
       Vector<HStoreFile> srcFiles = es.getValue();
       HStoreFile dst = new HStoreFile(conf, merges, newRegionInfo.regionName,
@@ -233,6 +154,25 @@
     
     return dstRegion;
   }
+  
+  /*
+   * Fills a map with a vector of store files keyed by column family. 
+   * @param byFamily Map to fill.
+   * @param storeFiles Store files to process.
+   * @return Returns <code>byFamily</code>
+   */
+  private static Map<Text, Vector<HStoreFile>> filesByFamily(
+      Map<Text, Vector<HStoreFile>> byFamily, Vector<HStoreFile> storeFiles) {
+    for(HStoreFile src: storeFiles) {
+      Vector<HStoreFile> v = byFamily.get(src.getColFamily());
+      if(v == null) {
+        v = new Vector<HStoreFile>();
+        byFamily.put(src.getColFamily(), v);
+      }
+      v.add(src);
+    }
+    return byFamily;
+  }
 
   //////////////////////////////////////////////////////////////////////////////
   // Members
@@ -254,19 +194,19 @@
   Path regiondir;
 
   static class WriteState {
-    volatile boolean writesOngoing;
-    volatile boolean writesEnabled;
-    WriteState() {
-      this.writesOngoing = true;
-      this.writesEnabled = true;
-    }
+    // Set while a memcache flush is happening.
+    volatile boolean flushing = false;
+    // Set while a compaction is running.
+    volatile boolean compacting = false;
+    // Gets set by last flush before close.  If set, cannot compact or flush
+    // again.
+    volatile boolean writesEnabled = true;
   }
   
   volatile WriteState writestate = new WriteState();
 
   final int memcacheFlushSize;
   final int blockingMemcacheSize;
-  int compactionThreshold = 0;
   private final HLocking lock = new HLocking();
   private long desiredMaxFileSize;
   private final long maxSequenceId;
@@ -297,15 +237,12 @@
   public HRegion(Path rootDir, HLog log, FileSystem fs, Configuration conf, 
       HRegionInfo regionInfo, Path initialFiles)
   throws IOException {
-    
     this.rootDir = rootDir;
     this.log = log;
     this.fs = fs;
     this.conf = conf;
     this.regionInfo = regionInfo;
     this.memcache = new HMemcache();
-    this.writestate.writesOngoing = true;
-    this.writestate.writesEnabled = true;
 
     // Declare the regionName.  This is a unique string for the region, used to 
     // build a unique filename.
@@ -319,7 +256,6 @@
     }
 
     // Load in all the HStores.
-
     long maxSeqId = -1;
     for(Map.Entry<Text, HColumnDescriptor> e :
         this.regionInfo.tableDesc.families().entrySet()) {
@@ -357,17 +293,12 @@
     this.blockingMemcacheSize = this.memcacheFlushSize *
       conf.getInt("hbase.hregion.memcache.block.multiplier", 2);
     
-    // By default, we compact the region if an HStore has more than
-    // MIN_COMMITS_FOR_COMPACTION map files
-    this.compactionThreshold =
-      conf.getInt("hbase.hregion.compactionThreshold", 3);
-    
     // By default we split region if a file > DEFAULT_MAX_FILE_SIZE.
     this.desiredMaxFileSize =
       conf.getLong("hbase.hregion.max.filesize", DEFAULT_MAX_FILE_SIZE);
 
     // HRegion is ready to go!
-    this.writestate.writesOngoing = false;
+    this.writestate.compacting = false;
     LOG.info("region " + this.regionInfo.regionName + " available");
   }
   
@@ -411,56 +342,48 @@
    * 
    * @param abort true if server is aborting (only during testing)
    * @return Vector of all the storage files that the HRegion's component 
-   * HStores make use of.  It's a list of HStoreFile objects.
+   * HStores make use of.  It's a list of HStoreFile objects.  Can be null if
+   * we are not to close at this time or we are already closed.
    * 
    * @throws IOException
    */
   Vector<HStoreFile> close(boolean abort) throws IOException {
     if (isClosed()) {
       LOG.info("region " + this.regionInfo.regionName + " already closed");
-      return new Vector<HStoreFile>();
+      return null;
     }
     lock.obtainWriteLock();
     try {
-      boolean shouldClose = false;
       synchronized(writestate) {
-        while(writestate.writesOngoing) {
+        while(writestate.compacting || writestate.flushing) {
           try {
             writestate.wait();
           } catch (InterruptedException iex) {
             // continue
           }
         }
-        writestate.writesOngoing = true;
-        shouldClose = true;
-      }
-
-      if(!shouldClose) {
-        return null;
+        // Disable compacting and flushing by background threads for this
+        // region.
+        writestate.writesEnabled = false;
       }
       
       // Write lock means no more row locks can be given out.  Wait on
       // outstanding row locks to come in before we close so we do not drop
       // outstanding updates.
       waitOnRowLocks();
-
-      Vector<HStoreFile> allHStoreFiles = null;
+      
       if (!abort) {
         // Don't flush the cache if we are aborting during a test.
-        allHStoreFiles = internalFlushcache();
+        internalFlushcache();
       }
+      
+      Vector<HStoreFile> result = new Vector<HStoreFile>();
       for (HStore store: stores.values()) {
-        store.close();
-      }
-      try {
-        return allHStoreFiles;
-      } finally {
-        synchronized (writestate) {
-          writestate.writesOngoing = false;
-        }
-        this.closed.set(true);
-        LOG.info("closed " + this.regionInfo.regionName);
+        result.addAll(store.close());
       }
+      this.closed.set(true);
+      LOG.info("closed " + this.regionInfo.regionName);
+      return result;
     } finally {
       lock.releaseWriteLock();
     }
@@ -527,6 +450,7 @@
       HStoreFile a = new HStoreFile(this.conf, splits,
         regionAInfo.regionName, h.getColFamily(), Math.abs(rand.nextLong()),
         aReference);
+      // Reference to top half of the hsf store file.
       HStoreFile.Reference bReference = new HStoreFile.Reference(
         getRegionName(), h.getFileId(), new HStoreKey(midKey),
         HStoreFile.Range.top);
@@ -721,12 +645,10 @@
     boolean needsCompaction = false;
     this.lock.obtainReadLock();
     try {
-      for(HStore store: stores.values()) {
-        if(store.getNMaps() > this.compactionThreshold) {
+      for (HStore store: stores.values()) {
+        if (store.needsCompaction()) {
           needsCompaction = true;
-          LOG.info(getRegionName().toString() + " needs compaction because " +
-            store.getNMaps() + " store files present and threshold is " +
-            this.compactionThreshold);
+          LOG.info(store.toString() + " needs compaction");
           break;
         }
       }
@@ -756,9 +678,9 @@
     lock.obtainReadLock();
     try {
       synchronized (writestate) {
-        if ((!writestate.writesOngoing) &&
+        if ((!writestate.compacting) &&
             writestate.writesEnabled) {
-          writestate.writesOngoing = true;
+          writestate.compacting = true;
           shouldCompact = true;
         }
       }
@@ -783,7 +705,7 @@
     } finally {
       lock.releaseReadLock();
       synchronized (writestate) {
-        writestate.writesOngoing = false;
+        writestate.compacting = false;
         writestate.notifyAll();
       }
     }
@@ -825,23 +747,17 @@
    * close() the HRegion shortly, so the HRegion should not take on any new and 
    * potentially long-lasting disk operations. This flush() should be the final
    * pre-close() disk operation.
-   * 
-   * @return List of store files including new flushes, if any.  If no flushes
-   * because  memcache is null, returns all current store files.  Returns
-   * null if no flush (Writes are going on elsewhere -- concurrently we are
-   * compacting or splitting).
    */
-  Vector<HStoreFile> flushcache(boolean disableFutureWrites)
+  void flushcache(boolean disableFutureWrites)
   throws IOException {
     if (this.closed.get()) {
-      return null;
+      return;
     }
     this.noFlushCount = 0;
     boolean shouldFlush = false;
     synchronized(writestate) {
-      if((!writestate.writesOngoing) &&
-          writestate.writesEnabled) {
-        writestate.writesOngoing = true;
+      if((!writestate.flushing) && writestate.writesEnabled) {
+        writestate.flushing = true;
         shouldFlush = true;
         if(disableFutureWrites) {
           writestate.writesEnabled = false;
@@ -854,14 +770,14 @@
         LOG.debug("NOT flushing memcache for region " +
           this.regionInfo.regionName);
       }
-      return null;  
+      return;  
     }
     
     try {
-      return internalFlushcache();
+      internalFlushcache();
     } finally {
       synchronized (writestate) {
-        writestate.writesOngoing = false;
+        writestate.flushing = false;
         writestate.notifyAll();
       }
     }
@@ -892,11 +808,8 @@
    * routes.
    * 
    * <p> This method may block for some time.
-   * 
-   * @return List of store files including just-made new flushes per-store. If
-   * not flush, returns list of all store files.
    */
-  Vector<HStoreFile> internalFlushcache() throws IOException {
+  void internalFlushcache() throws IOException {
     long startTime = -1;
     if(LOG.isDebugEnabled()) {
       startTime = System.currentTimeMillis();
@@ -917,7 +830,7 @@
     HMemcache.Snapshot retval = memcache.snapshotMemcacheForLog(log);
     if(retval == null || retval.memcacheSnapshot == null) {
       LOG.debug("Finished memcache flush; empty snapshot");
-      return getAllStoreFiles();
+      return;
     }
     long logCacheFlushId = retval.sequenceId;
     if(LOG.isDebugEnabled()) {
@@ -929,11 +842,8 @@
     // A.  Flush memcache to all the HStores.
     // Keep running vector of all store files that includes both old and the
     // just-made new flush store file.
-    Vector<HStoreFile> allHStoreFiles = new Vector<HStoreFile>();
     for(HStore hstore: stores.values()) {
-      Vector<HStoreFile> hstoreFiles
-        = hstore.flushCache(retval.memcacheSnapshot, retval.sequenceId);
-      allHStoreFiles.addAll(0, hstoreFiles);
+      hstore.flushCache(retval.memcacheSnapshot, retval.sequenceId);
     }
 
     // B.  Write a FLUSHCACHE-COMPLETE message to the log.
@@ -958,13 +868,12 @@
         this.regionInfo.regionName + " in " +
           (System.currentTimeMillis() - startTime) + "ms");
     }
-    return allHStoreFiles;
   }
   
   private Vector<HStoreFile> getAllStoreFiles() {
     Vector<HStoreFile> allHStoreFiles = new Vector<HStoreFile>();
     for(HStore hstore: stores.values()) {
-      Vector<HStoreFile> hstoreFiles = hstore.getAllMapFiles();
+      Vector<HStoreFile> hstoreFiles = hstore.getAllStoreFiles();
       allHStoreFiles.addAll(0, hstoreFiles);
     }
     return allHStoreFiles;
@@ -1020,7 +929,6 @@
       }
 
       // If unavailable in memcache, check the appropriate HStore
-
       Text colFamily = HStoreKey.extractFamily(key.getColumn());
       HStore targetStore = stores.get(colFamily);
       if(targetStore == null) {

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java?view=diff&rev=566459&r1=566458&r2=566459
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java Wed Aug 15 18:07:51 2007
@@ -158,7 +158,8 @@
           try {
             for(HRegion cur: regionsToCheck) {
               if(cur.isClosed()) {
-                continue;                               // Skip if closed
+                // Skip if closed
+                continue;
               }
               if (cur.needsCompaction()) {
                 cur.compactStores();
@@ -272,10 +273,6 @@
   protected final Integer cacheFlusherLock = new Integer(0);
   
   /* Runs periodically to flush memcache.
-   * 
-   * Memcache flush is also called just before compaction and just before
-   * split so memcache is best prepared for the the long trip across
-   * compactions/splits during which it will not be able to flush to disk.
    */
   class Flusher implements Runnable {
     /**
@@ -286,9 +283,7 @@
         long startTime = System.currentTimeMillis();
 
         synchronized(cacheFlusherLock) {
-
           // Grab a list of items to flush
-
           Vector<HRegion> toFlush = new Vector<HRegion>();
           lock.readLock().lock();
           try {
@@ -837,6 +832,7 @@
   BlockingQueue<ToDoEntry> toDo;
   private Worker worker;
   private Thread workerThread;
+  
   /** Thread that performs long running requests from the master */
   class Worker implements Runnable {
     void stop() {
@@ -910,7 +906,6 @@
     HRegion region = onlineRegions.get(regionInfo.regionName);
     if(region == null) {
       region = new HRegion(rootDir, log, fs, conf, regionInfo, null);
-
       this.lock.writeLock().lock();
       try {
         this.log.setSequenceNumber(region.getMaxSequenceId());
@@ -1193,7 +1188,7 @@
    * @return {@link HRegion} for <code>regionName</code>
    * @throws NotServingRegionException
    */
-  protected HRegion getRegion(final Text regionName,
+  protected HRegion getRegion(final Text regionName, 
       final boolean checkRetiringRegions)
   throws NotServingRegionException {
     HRegion region = null;

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java?view=diff&rev=566459&r1=566458&r2=566459
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java Wed Aug 15 18:07:51 2007
@@ -30,6 +30,7 @@
 import java.util.Random;
 import java.util.TreeMap;
 import java.util.Vector;
+import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -42,6 +43,7 @@
 import org.apache.hadoop.io.MapFile;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.StringUtils;
 import org.onelab.filter.BloomFilter;
@@ -92,6 +94,8 @@
   Random rand = new Random();
   
   private long maxSeqId;
+  
+  private int compactionThreshold;
 
   /**
    * An HStore is a set of zero or more MapFiles, which stretch backwards over 
@@ -164,7 +168,7 @@
 
     if(LOG.isDebugEnabled()) {
       LOG.debug("starting " + this.storeName +
-        ((reconstructionLog == null)?
+        ((reconstructionLog == null || !fs.exists(reconstructionLog))?
           " (no reconstruction log)": " with reconstruction log: " +
           reconstructionLog.toString()));
     }
@@ -215,19 +219,19 @@
     }
     
     doReconstructionLog(reconstructionLog, maxSeqId);
-    this.maxSeqId += 1;
 
-    // Compact all the MapFiles into a single file.  The resulting MapFile 
-    // should be "timeless"; that is, it should not have an associated seq-ID, 
-    // because all log messages have been reflected in the TreeMaps at this
-    // point.  
-    //
-    // TODO: Only do the compaction if we are over a threshold, not
-    // every time. Not necessary if only two or three store files.  Fix after
-    // revamp of compaction.
-    if(storefiles.size() > 1) {
-      compactHelper(true);
-    }
+    // By default, we compact if an HStore has more than
+    // MIN_COMMITS_FOR_COMPACTION map files
+    this.compactionThreshold =
+      conf.getInt("hbase.hstore.compactionThreshold", 3);
+    
+    // We used to compact in here before bringing the store online.  Instead
+    // get it online quick even if it needs compactions so we can start
+    // taking updates as soon as possible (Once online, can take updates even
+    // during a compaction).
+
+    // Move maxSeqId on by one. Why here?  And not in HRegion?
+    this.maxSeqId += 1;
     
     // Finally, start up all the map readers! (There should be just one at this 
     // point, as we've compacted them all.)
@@ -253,10 +257,6 @@
       final long maxSeqID)
   throws UnsupportedEncodingException, IOException {
     if (reconstructionLog == null || !fs.exists(reconstructionLog)) {
-      if (reconstructionLog != null && !fs.exists(reconstructionLog)) {
-        LOG.warn("Passed reconstruction log " + reconstructionLog +
-          " does not exist");
-      }
       // Nothing to do.
       return;
     }
@@ -397,15 +397,18 @@
    * Close all the MapFile readers
    * @throws IOException
    */
-  void close() throws IOException {
+  Vector<HStoreFile> close() throws IOException {
+    Vector<HStoreFile> result = null;
     this.lock.obtainWriteLock();
     try {
       for (MapFile.Reader reader: this.readers.values()) {
         reader.close();
       }
       this.readers.clear();
+      result = new Vector<HStoreFile>(storefiles.values());
       this.storefiles.clear();
       LOG.info("closed " + this.storeName);
+      return result;
     } finally {
       this.lock.releaseWriteLock();
     }
@@ -428,16 +431,15 @@
    *
    * @param inputCache memcache to flush
    * @param logCacheFlushId flush sequence number
-   * @return Vector of all the HStoreFiles in use
    * @throws IOException
    */
-  Vector<HStoreFile> flushCache(final TreeMap<HStoreKey, byte []> inputCache,
+  void flushCache(final TreeMap<HStoreKey, byte []> inputCache,
     final long logCacheFlushId)
   throws IOException {
-    return flushCacheHelper(inputCache, logCacheFlushId, true);
+    flushCacheHelper(inputCache, logCacheFlushId, true);
   }
   
-  Vector<HStoreFile> flushCacheHelper(TreeMap<HStoreKey, byte []> inputCache,
+  void flushCacheHelper(TreeMap<HStoreKey, byte []> inputCache,
       long logCacheFlushId, boolean addToAvailableMaps)
   throws IOException {
     synchronized(flushLock) {
@@ -447,12 +449,31 @@
       String name = flushedFile.toString();
       MapFile.Writer out = flushedFile.getWriter(this.fs, this.compression,
         this.bloomFilter);
+      
+      // hbase.hstore.compact.on.flush=true enables picking up an existing
+      // HStoreFIle from disk interlacing the memcache flush compacting as we
+      // go.  The notion is that interlacing would take as long as a pure
+      // flush with the added benefit of having one less file in the store. 
+      // Experiments show that it takes two to three times the amount of time
+      // flushing -- more column families makes it so the two timings come
+      // closer together -- but it also complicates the flush. Disabled for
+      // now.  Needs work picking which file to interlace (favor references
+      // first, etc.)
+      //
+      // Related, looks like 'merging compactions' in BigTable paper interlaces
+      // a memcache flush.  We don't.
       try {
-        for (Map.Entry<HStoreKey, byte []> es: inputCache.entrySet()) {
-          HStoreKey curkey = es.getKey();
-          if (this.familyName.
-              equals(HStoreKey.extractFamily(curkey.getColumn()))) {
-            out.append(curkey, new ImmutableBytesWritable(es.getValue()));
+        if (this.conf.getBoolean("hbase.hstore.compact.on.flush", false) &&
+            this.storefiles.size() > 0) {
+          compact(out, inputCache.entrySet().iterator(),
+              this.readers.get(this.storefiles.firstKey()));
+        } else {
+          for (Map.Entry<HStoreKey, byte []> es: inputCache.entrySet()) {
+            HStoreKey curkey = es.getKey();
+            if (this.familyName.
+                equals(HStoreKey.extractFamily(curkey.getColumn()))) {
+              out.append(curkey, new ImmutableBytesWritable(es.getValue()));
+            }
           }
         }
       } finally {
@@ -486,14 +507,14 @@
           this.lock.releaseWriteLock();
         }
       }
-      return getAllMapFiles();
+      return;
     }
   }
 
   /**
    * @return - vector of all the HStore files in use
    */
-  Vector<HStoreFile> getAllMapFiles() {
+  Vector<HStoreFile> getAllStoreFiles() {
     this.lock.obtainReadLock();
     try {
       return new Vector<HStoreFile>(storefiles.values());
@@ -505,6 +526,14 @@
   //////////////////////////////////////////////////////////////////////////////
   // Compaction
   //////////////////////////////////////////////////////////////////////////////
+  
+  /**
+   * @return True if this store needs compaction.
+   */
+  public boolean needsCompaction() {
+    return this.storefiles != null &&
+    this.storefiles.size() >= this.compactionThreshold;
+  }
 
   /**
    * Compact the back-HStores.  This method may take some time, so the calling 
@@ -528,11 +557,24 @@
     compactHelper(false);
   }
   
-  void compactHelper(boolean deleteSequenceInfo) throws IOException {
+  void compactHelper(final boolean deleteSequenceInfo)
+  throws IOException {
+    compactHelper(deleteSequenceInfo, -1);
+  }
+  
+  /* 
+   * @param deleteSequenceInfo True if we are to set the sequence number to -1
+   * on compacted file.
+   * @param maxSeenSeqID We may have already calculated the maxSeenSeqID.  If
+   * so, pass it here.  Otherwise, pass -1 and it will be calculated inside in
+   * this method.
+   * @throws IOException
+   */
+  void compactHelper(final boolean deleteSequenceInfo, long maxSeenSeqID)
+  throws IOException {
     synchronized(compactLock) {
       Path curCompactStore =
         HStoreFile.getHStoreDir(compactdir, regionName, familyName);
-      fs.mkdirs(curCompactStore);
       if(LOG.isDebugEnabled()) {
         LOG.debug("started compaction of " + storefiles.size() + " files in " +
           curCompactStore.toString());
@@ -547,28 +589,32 @@
           this.lock.releaseWriteLock();
         }
 
-        // Compute the max-sequenceID seen in any of the to-be-compacted
-        // TreeMaps
-        long maxSeenSeqID = -1;
-        for (HStoreFile hsf: toCompactFiles) {
-          long seqid = hsf.loadInfo(fs);
-          if(seqid > 0) {
-            if(seqid > maxSeenSeqID) {
-              maxSeenSeqID = seqid;
-            }
-          }
-        }
-
-        HStoreFile compactedOutputFile 
-          = new HStoreFile(conf, compactdir, regionName, familyName, -1);
-        if(toCompactFiles.size() == 1) {
-          // TODO: Only rewrite if NOT a HSF reference file.
-          if(LOG.isDebugEnabled()) {
+        HStoreFile compactedOutputFile =
+          new HStoreFile(conf, compactdir, regionName, familyName, -1);
+        if (toCompactFiles.size() < 1 ||
+            (toCompactFiles.size() == 1 &&
+              !toCompactFiles.get(0).isReference())) {
+          if (LOG.isDebugEnabled()) {
             LOG.debug("nothing to compact for " + this.storeName);
           }
-          HStoreFile hsf = toCompactFiles.elementAt(0);
-          if(hsf.loadInfo(fs) == -1) {
-            return;
+          if (deleteSequenceInfo && toCompactFiles.size() == 1) {
+            toCompactFiles.get(0).writeInfo(fs, -1);
+          }
+          return;
+        }
+        
+        fs.mkdirs(curCompactStore);
+        
+        // Compute the max-sequenceID seen in any of the to-be-compacted
+        // TreeMaps if it hasn't been passed in to us.
+        if (maxSeenSeqID == -1) {
+          for (HStoreFile hsf: toCompactFiles) {
+            long seqid = hsf.loadInfo(fs);
+            if(seqid > 0) {
+              if(seqid > maxSeenSeqID) {
+                maxSeenSeqID = seqid;
+              }
+            }
           }
         }
 
@@ -577,108 +623,11 @@
           compactedOutputFile.getWriter(this.fs, this.compression,
             this.bloomFilter);
         try {
-          // We create a new set of MapFile.Reader objects so we don't screw up 
-          // the caching associated with the currently-loaded ones.
-          //
-          // Our iteration-based access pattern is practically designed to ruin 
-          // the cache.
-          //
-          // We work by opening a single MapFile.Reader for each file, and 
-          // iterating through them in parallel.  We always increment the 
-          // lowest-ranked one.  Updates to a single row/column will appear 
-          // ranked by timestamp.  This allows us to throw out deleted values or
-          // obsolete versions.
-          MapFile.Reader[] rdrs = new MapFile.Reader[toCompactFiles.size()];
-          HStoreKey[] keys = new HStoreKey[toCompactFiles.size()];
-          ImmutableBytesWritable[] vals =
-            new ImmutableBytesWritable[toCompactFiles.size()];
-          boolean[] done = new boolean[toCompactFiles.size()];
-          int pos = 0;
-          for(HStoreFile hsf: toCompactFiles) {
-            rdrs[pos] = hsf.getReader(this.fs, this.bloomFilter);
-            keys[pos] = new HStoreKey();
-            vals[pos] = new ImmutableBytesWritable();
-            done[pos] = false;
-            pos++;
-          }
-
-          // Now, advance through the readers in order.  This will have the
-          // effect of a run-time sort of the entire dataset.
-          int numDone = 0;
-          for(int i = 0; i < rdrs.length; i++) {
-            rdrs[i].reset();
-            done[i] = ! rdrs[i].next(keys[i], vals[i]);
-            if(done[i]) {
-              numDone++;
-            }
-          }
-          
-          int timesSeen = 0;
-          Text lastRow = new Text();
-          Text lastColumn = new Text();
-          while(numDone < done.length) {
-            // Find the reader with the smallest key
-            int smallestKey = -1;
-            for(int i = 0; i < rdrs.length; i++) {
-              if(done[i]) {
-                continue;
-              }
-              
-              if(smallestKey < 0) {
-                smallestKey = i;
-              } else {
-                if(keys[i].compareTo(keys[smallestKey]) < 0) {
-                  smallestKey = i;
-                }
-              }
-            }
-
-            // Reflect the current key/val in the output
-            HStoreKey sk = keys[smallestKey];
-            if(lastRow.equals(sk.getRow())
-                && lastColumn.equals(sk.getColumn())) {
-              timesSeen++;
-            } else {
-              timesSeen = 1;
-            }
-            
-            if(timesSeen <= family.getMaxVersions()) {
-              // Keep old versions until we have maxVersions worth.
-              // Then just skip them.
-              if(sk.getRow().getLength() != 0
-                  && sk.getColumn().getLength() != 0) {
-                // Only write out objects which have a non-zero length key and
-                // value
-                compactedOut.append(sk, vals[smallestKey]);
-              }
-            }
-
-            // TODO: I don't know what to do about deleted values.  I currently 
-            // include the fact that the item was deleted as a legitimate 
-            // "version" of the data.  Maybe it should just drop the deleted
-            // val?
-
-            // Update last-seen items
-            lastRow.set(sk.getRow());
-            lastColumn.set(sk.getColumn());
-
-            // Advance the smallest key.  If that reader's all finished, then 
-            // mark it as done.
-            if(! rdrs[smallestKey].next(keys[smallestKey],
-                vals[smallestKey])) {
-              done[smallestKey] = true;
-              rdrs[smallestKey].close();
-              numDone++;
-            }
-          }
+          compact(compactedOut, toCompactFiles);
         } finally {
           compactedOut.close();
         }
 
-        if(LOG.isDebugEnabled()) {
-          LOG.debug("writing new compacted HStore " + compactedOutputFile);
-        }
-
         // Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap.
         if((! deleteSequenceInfo) && maxSeenSeqID >= 0) {
           compactedOutputFile.writeInfo(fs, maxSeenSeqID);
@@ -691,8 +640,7 @@
         DataOutputStream out = new DataOutputStream(fs.create(filesToReplace));
         try {
           out.writeInt(toCompactFiles.size());
-          for(Iterator<HStoreFile> it = toCompactFiles.iterator(); it.hasNext(); ) {
-            HStoreFile hsf = it.next();
+          for(HStoreFile hsf: toCompactFiles) {
             hsf.write(out);
           }
         } finally {
@@ -706,7 +654,207 @@
         // Move the compaction into place.
         processReadyCompaction();
       } finally {
-        fs.delete(compactdir);
+        if (fs.exists(compactdir)) {
+          fs.delete(compactdir);
+        }
+      }
+    }
+  }
+  
+  /*
+   * Compact passed <code>toCompactFiles</code> into <code>compactedOut</code>. 
+   * We create a new set of MapFile.Reader objects so we don't screw up 
+   * the caching associated with the currently-loaded ones. Our
+   * iteration-based access pattern is practically designed to ruin 
+   * the cache.
+   *
+   * We work by opening a single MapFile.Reader for each file, and 
+   * iterating through them in parallel.  We always increment the 
+   * lowest-ranked one.  Updates to a single row/column will appear 
+   * ranked by timestamp.  This allows us to throw out deleted values or
+   * obsolete versions.
+   * @param compactedOut
+   * @param toCompactFiles
+   * @throws IOException
+   */
+  void compact(final MapFile.Writer compactedOut,
+      final Vector<HStoreFile> toCompactFiles)
+  throws IOException {
+    int size = toCompactFiles.size();
+    CompactionReader[] rdrs = new CompactionReader[size];
+    int index = 0;
+    for (HStoreFile hsf: toCompactFiles) {
+      try {
+        rdrs[index++] =
+          new MapFileCompactionReader(hsf.getReader(fs, bloomFilter));
+      } catch (IOException e) {
+        // Add info about which file threw exception. It may not be in the
+        // exception message so output a message here where we know the
+        // culprit.
+        LOG.warn("Failed with " + e.toString() + ": " + hsf.toString() +
+          (hsf.isReference()? " " + hsf.getReference().toString(): ""));
+        throw e;
+      }
+    }
+    try {
+      compact(compactedOut, rdrs);
+    } finally {
+      for (int i = 0; i < rdrs.length; i++) {
+        if (rdrs[i] != null) {
+          try {
+            rdrs[i].close();
+          } catch (IOException e) {
+            LOG.warn("Exception closing reader", e);
+          }
+        }
+      }
+    }
+  }
+  
+  interface CompactionReader {
+    public void close() throws IOException;
+    public boolean next(WritableComparable key, Writable val)
+      throws IOException;
+    public void reset() throws IOException;
+  }
+  
+  class MapFileCompactionReader implements CompactionReader {
+    final MapFile.Reader reader;
+    
+    MapFileCompactionReader(final MapFile.Reader r) {
+      this.reader = r;
+    }
+    
+    public void close() throws IOException {
+      this.reader.close();
+    }
+
+    public boolean next(WritableComparable key, Writable val)
+    throws IOException {
+      return this.reader.next(key, val);
+    }
+
+    public void reset() throws IOException {
+      this.reader.reset();
+    }
+  }
+  
+  void compact(final MapFile.Writer compactedOut,
+      final Iterator<Entry<HStoreKey, byte []>> iterator,
+      final MapFile.Reader reader)
+  throws IOException {
+    // Make an instance of a CompactionReader that wraps the iterator.
+    CompactionReader cr = new CompactionReader() {
+      public boolean next(WritableComparable key, Writable val)
+          throws IOException {
+        boolean result = false;
+        while (iterator.hasNext()) {
+          Entry<HStoreKey, byte []> e = iterator.next();
+          HStoreKey hsk = e.getKey();
+          if (familyName.equals(HStoreKey.extractFamily(hsk.getColumn()))) {
+            ((HStoreKey)key).set(hsk);
+            ((ImmutableBytesWritable)val).set(e.getValue());
+            result = true;
+            break;
+          }
+        }
+        return result;
+      }
+
+      @SuppressWarnings("unused")
+      public void reset() throws IOException {
+        // noop.
+      }
+      
+      @SuppressWarnings("unused")
+      public void close() throws IOException {
+        // noop.
+      }
+    };
+    
+    compact(compactedOut,
+      new CompactionReader [] {cr, new MapFileCompactionReader(reader)});
+  }
+  
+  void compact(final MapFile.Writer compactedOut,
+      final CompactionReader [] rdrs)
+  throws IOException {
+    HStoreKey[] keys = new HStoreKey[rdrs.length];
+    ImmutableBytesWritable[] vals = new ImmutableBytesWritable[rdrs.length];
+    boolean[] done = new boolean[rdrs.length];
+    for(int i = 0; i < rdrs.length; i++) {
+      keys[i] = new HStoreKey();
+      vals[i] = new ImmutableBytesWritable();
+      done[i] = false;
+    }
+
+    // Now, advance through the readers in order.  This will have the
+    // effect of a run-time sort of the entire dataset.
+    int numDone = 0;
+    for(int i = 0; i < rdrs.length; i++) {
+      rdrs[i].reset();
+      done[i] = ! rdrs[i].next(keys[i], vals[i]);
+      if(done[i]) {
+        numDone++;
+      }
+    }
+
+    int timesSeen = 0;
+    Text lastRow = new Text();
+    Text lastColumn = new Text();
+    while(numDone < done.length) {
+      // Find the reader with the smallest key
+      int smallestKey = -1;
+      for(int i = 0; i < rdrs.length; i++) {
+        if(done[i]) {
+          continue;
+        }
+        if(smallestKey < 0) {
+          smallestKey = i;
+        } else {
+          if(keys[i].compareTo(keys[smallestKey]) < 0) {
+            smallestKey = i;
+          }
+        }
+      }
+
+      // Reflect the current key/val in the output
+      HStoreKey sk = keys[smallestKey];
+      if(lastRow.equals(sk.getRow())
+          && lastColumn.equals(sk.getColumn())) {
+        timesSeen++;
+      } else {
+        timesSeen = 1;
+      }
+
+      if(timesSeen <= family.getMaxVersions()) {
+        // Keep old versions until we have maxVersions worth.
+        // Then just skip them.
+        if(sk.getRow().getLength() != 0
+            && sk.getColumn().getLength() != 0) {
+          // Only write out objects which have a non-zero length key and
+          // value
+          compactedOut.append(sk, vals[smallestKey]);
+        }
+      }
+
+      // TODO: I don't know what to do about deleted values.  I currently 
+      // include the fact that the item was deleted as a legitimate 
+      // "version" of the data.  Maybe it should just drop the deleted
+      // val?
+
+      // Update last-seen items
+      lastRow.set(sk.getRow());
+      lastColumn.set(sk.getColumn());
+
+      // Advance the smallest key.  If that reader's all finished, then 
+      // mark it as done.
+      if(!rdrs[smallestKey].next(keys[smallestKey],
+          vals[smallestKey])) {
+        done[smallestKey] = true;
+        rdrs[smallestKey].close();
+        rdrs[smallestKey] = null;
+        numDone++;
       }
     }
   }
@@ -773,21 +921,19 @@
         }
       }
 
+      Vector<HStoreFile> toDelete = new Vector<HStoreFile>(keys.size());
       for (Long key: keys) {
         MapFile.Reader reader = this.readers.remove(key);
         if (reader != null) {
           reader.close();
         }
         HStoreFile hsf = this.storefiles.remove(key);
-        // 4. Delete all old files, no longer needed
-        hsf.delete();
-      }
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("deleted " + toCompactFiles.size() + " old file(s)");
+        // 4. Add to the toDelete files all old files, no longer needed
+        toDelete.add(hsf);
       }
       
-      // What if we fail now?  The above deletes will fail silently. We'd better
-      // make sure not to write out any new files with the same names as 
+      // What if we fail now?  The above deletes will fail silently. We'd
+      // better make sure not to write out any new files with the same names as 
       // something we delete, though.
 
       // 5. Moving the new MapFile into place
@@ -800,9 +946,23 @@
           compactdir.toString() +
           " to " + finalCompactedFile.toString() + " in " + dir.toString());
       }
-      compactedFile.rename(this.fs, finalCompactedFile);
+      if (!compactedFile.rename(this.fs, finalCompactedFile)) {
+        LOG.error("Failed move of compacted file " +
+          finalCompactedFile.toString());
+        return;
+      }
+      
+      // Safe to delete now compaction has been moved into place.
+      for (HStoreFile hsf: toDelete) {
+        if (hsf.getFileId() == finalCompactedFile.getFileId()) {
+          // Be careful we do not delte the just compacted file.
+          LOG.warn("Weird. File to delete has same name as one we are " +
+            "about to delete (skipping): " + hsf.getFileId());
+          continue;
+        }
+        hsf.delete();
+      }
 
-      // Fail here?  No worries.
       Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs));
 
       // 6. Loading the new TreeMap.
@@ -810,7 +970,6 @@
         finalCompactedFile.getReader(this.fs, this.bloomFilter));
       this.storefiles.put(orderVal, finalCompactedFile);
     } finally {
-      
       // 7. Releasing the write-lock
       this.lock.releaseWriteLock();
     }
@@ -838,6 +997,9 @@
           map.reset();
           ImmutableBytesWritable readval = new ImmutableBytesWritable();
           HStoreKey readkey = (HStoreKey)map.getClosest(key, readval);
+          if (readkey == null) {
+            continue;
+          }
           do {
             Text readcol = readkey.getColumn();
             if (results.get(readcol) == null
@@ -1004,7 +1166,7 @@
   /**
    * @return    Returns the number of map files currently in use
    */
-  int getNMaps() {
+  int countOfStoreFiles() {
     this.lock.obtainReadLock();
     try {
       return storefiles.size();
@@ -1014,6 +1176,22 @@
     }
   }
   
+  boolean hasReferences() {
+    boolean result = false;
+    this.lock.obtainReadLock();
+    try {
+        for (HStoreFile hsf: this.storefiles.values()) {
+          if (hsf.isReference()) {
+            break;
+          }
+        }
+      
+    } finally {
+      this.lock.releaseReadLock();
+    }
+    return result;
+  }
+  
   //////////////////////////////////////////////////////////////////////////////
   // File administration
   //////////////////////////////////////////////////////////////////////////////
@@ -1037,6 +1215,11 @@
       Text firstRow) throws IOException {
     
     return new HStoreScanner(timestamp, targetCols, firstRow);
+  }
+  
+  @Override
+  public String toString() {
+    return this.storeName;
   }
 
   //////////////////////////////////////////////////////////////////////////////

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreFile.java?view=diff&rev=566459&r1=566458&r2=566459
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreFile.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreFile.java Wed Aug 15 18:07:51 2007
@@ -399,7 +399,13 @@
       Path mapfile = curfile.getMapFilePath();
       if (!fs.exists(mapfile)) {
         fs.delete(curfile.getInfoFilePath());
+        LOG.warn("Mapfile " + mapfile.toString() + " does not exist. " +
+          "Cleaned up info file.  Continuing...");
+        continue;
       }
+      
+      // TODO: Confirm referent exists.
+      
       // Found map and sympathetic info file.  Add this hstorefile to result.
       results.add(curfile);
       // Keep list of sympathetic data mapfiles for cleaning info dir in next
@@ -537,8 +543,7 @@
     
     try {
       for(HStoreFile src: srcFiles) {
-        MapFile.Reader in =
-          new MapFile.Reader(fs, src.getMapFilePath().toString(), conf);
+        MapFile.Reader in = src.getReader(fs, null);
         try {
           HStoreKey readkey = new HStoreKey();
           ImmutableBytesWritable readval = new ImmutableBytesWritable();
@@ -627,12 +632,23 @@
    * <code>hsf</code> directory.
    * @param fs
    * @param hsf
+   * @return True if succeeded.
    * @throws IOException
    */
-  public void rename(final FileSystem fs, final HStoreFile hsf)
+  public boolean rename(final FileSystem fs, final HStoreFile hsf)
   throws IOException {
-    fs.rename(getMapFilePath(), hsf.getMapFilePath());
-    fs.rename(getInfoFilePath(), hsf.getInfoFilePath());
+    boolean success = fs.rename(getMapFilePath(), hsf.getMapFilePath());
+    if (!success) {
+      LOG.warn("Failed rename of " + getMapFilePath() + " to " +
+        hsf.getMapFilePath());
+      return success;
+    }
+    success = fs.rename(getInfoFilePath(), hsf.getInfoFilePath());
+    if (!success) {
+      LOG.warn("Failed rename of " + getInfoFilePath() + " to " +
+        hsf.getInfoFilePath());
+    }
+    return success;
   }
   
   /**

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java?view=diff&rev=566459&r1=566458&r2=566459
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java Wed Aug 15 18:07:51 2007
@@ -301,6 +301,10 @@
     if(result == 0) {
       result = this.column.compareTo(other.column);
       if(result == 0) {
+        // The below older timestamps sorting ahead of newer timestamps looks
+        // wrong but it is intentional.  This way, newer timestamps are first
+        // found when we iterate over a memcache and newer versions are the
+        // first we trip over when reading from a store file.
         if(this.timestamp < other.timestamp) {
           result = 1;
         } else if(this.timestamp > other.timestamp) {

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java?view=diff&rev=566459&r1=566458&r2=566459
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java Wed Aug 15 18:07:51 2007
@@ -88,6 +88,13 @@
   }
   
   /**
+   * @param b Use passed bytes as backing array for this instance.
+   */
+  public void set(final byte [] b) {
+    this.bytes = b;
+  }
+  
+  /**
    * @return the current size of the buffer.
    */
   public int getSize() {

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/Writables.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/Writables.java?view=diff&rev=566459&r1=566458&r2=566459
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/Writables.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/Writables.java Wed Aug 15 18:07:51 2007
@@ -66,15 +66,16 @@
    * @param w An empty Writable (usually made by calling the null-arg
    * constructor).
    * @return The passed Writable after its readFields has been called fed
-   * by the passed <code>bytes</code> array or null if passed null or
-   * empty <code>bytes</code>.
+   * by the passed <code>bytes</code> array or IllegalArgumentException
+   * if passed null or an empty <code>bytes</code> array.
    * @throws IOException
+   * @throws IllegalArgumentException
    */
   public static Writable getWritable(final byte [] bytes, final Writable w)
   throws IOException {
     if (bytes == null || bytes.length == 0) {
-      throw new IllegalArgumentException(
-          "Con't build a writable with empty bytes array");
+      throw new IllegalArgumentException("Can't build a writable with empty " +
+        "bytes array");
     }
     if (w == null) {
       throw new IllegalArgumentException("Writable cannot be null");

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java?view=diff&rev=566459&r1=566458&r2=566459
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java Wed Aug 15 18:07:51 2007
@@ -32,6 +32,14 @@
  * Abstract base class for test cases. Performs all static initialization
  */
 public abstract class HBaseTestCase extends TestCase {
+  public final static String COLFAMILY_NAME1 = "colfamily1:";
+  public final static String COLFAMILY_NAME2 = "colfamily2:";
+  public final static String COLFAMILY_NAME3 = "colfamily3:";
+  protected Path testDir = null;
+  protected FileSystem localFs = null;
+  public static final char FIRST_CHAR = 'a';
+  public static final char LAST_CHAR = 'z';
+  
   static {
     StaticTestEnvironment.initialize();
   }
@@ -47,6 +55,29 @@
     super(name);
     conf = new HBaseConfiguration();
   }
+  
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+    this.testDir = getUnitTestdir(getName());
+    this.localFs = FileSystem.getLocal(this.conf);
+    if (localFs.exists(testDir)) {
+      localFs.delete(testDir);
+    }
+  }
+  
+  @Override
+  protected void tearDown() throws Exception {
+    try {
+      if (this.localFs != null && this.testDir != null &&
+          this.localFs.exists(testDir)) {
+        this.localFs.delete(testDir);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    super.tearDown();
+  }
 
   protected Path getUnitTestdir(String testName) {
     return new Path(StaticTestEnvironment.TEST_DIRECTORY_KEY, testName);
@@ -62,5 +93,113 @@
     return new HRegion(dir,
       new HLog(fs, new Path(regionDir, HConstants.HREGION_LOGDIR_NAME), conf),
       fs, conf, info, null);
+  }
+  
+  protected HTableDescriptor createTableDescriptor(final String name) {
+    HTableDescriptor htd = new HTableDescriptor(name);
+    htd.addFamily(new HColumnDescriptor(COLFAMILY_NAME1));
+    htd.addFamily(new HColumnDescriptor(COLFAMILY_NAME2));
+    htd.addFamily(new HColumnDescriptor(COLFAMILY_NAME3));
+    return htd;
+  }
+  
+  protected void addContent(final HRegion r, final String column)
+  throws IOException {
+    Text startKey = r.getRegionInfo().getStartKey();
+    Text endKey = r.getRegionInfo().getEndKey();
+    byte [] startKeyBytes = startKey.getBytes();
+    if (startKeyBytes == null || startKeyBytes.length == 0) {
+      startKeyBytes = new byte [] {FIRST_CHAR, FIRST_CHAR, FIRST_CHAR};
+    }
+    addContent(new HRegionLoader(r), column, startKeyBytes, endKey);
+  }
+  
+  protected void addContent(final Loader updater, final String column)
+  throws IOException {
+    addContent(updater, column,
+      new byte [] {FIRST_CHAR, FIRST_CHAR, FIRST_CHAR}, null);
+  }
+  
+  protected void addContent(final Loader updater, final String column,
+      final byte [] startKeyBytes, final Text endKey)
+  throws IOException {
+    // Add rows of three characters.  The first character starts with the
+    // 'a' character and runs up to 'z'.  Per first character, we run the
+    // second character over same range.  And same for the third so rows
+    // (and values) look like this: 'aaa', 'aab', 'aac', etc.
+    char secondCharStart = (char)startKeyBytes[1];
+    char thirdCharStart = (char)startKeyBytes[2];
+    EXIT: for (char c = (char)startKeyBytes[0]; c <= LAST_CHAR; c++) {
+      for (char d = secondCharStart; d <= LAST_CHAR; d++) {
+        for (char e = thirdCharStart; e <= LAST_CHAR; e++) {
+          byte [] bytes = new byte [] {(byte)c, (byte)d, (byte)e};
+          Text t = new Text(new String(bytes));
+          if (endKey != null && endKey.getLength() > 0
+              && endKey.compareTo(t) <= 0) {
+            break EXIT;
+          }
+          long lockid = updater.startBatchUpdate(t);
+          try {
+            updater.put(lockid, new Text(column), bytes);
+            updater.commit(lockid);
+            lockid = -1;
+          } finally {
+            if (lockid != -1) {
+              updater.abort(lockid);
+            }
+          }
+        }
+        // Set start character back to FIRST_CHAR after we've done first loop.
+        thirdCharStart = FIRST_CHAR;
+      }
+      secondCharStart = FIRST_CHAR;
+    }
+  }
+  
+  public interface Loader {
+    public long startBatchUpdate(final Text row) throws IOException;
+    public void put(long lockid, Text column, byte val[]) throws IOException;
+    public void commit(long lockid) throws IOException;
+    public void abort(long lockid) throws IOException;
+  }
+  
+  public class HRegionLoader implements Loader {
+    final HRegion region;
+    public HRegionLoader(final HRegion HRegion) {
+      super();
+      this.region = HRegion;
+    }
+    public void abort(long lockid) throws IOException {
+      this.region.abort(lockid);
+    }
+    public void commit(long lockid) throws IOException {
+      this.region.commit(lockid, System.currentTimeMillis());
+    }
+    public void put(long lockid, Text column, byte[] val) throws IOException {
+      this.region.put(lockid, column, val);
+    }
+    public long startBatchUpdate(Text row) throws IOException {
+      return this.region.startUpdate(row);
+    }
+  }
+  
+  public class HTableLoader implements Loader {
+    final HTable table;
+    public HTableLoader(final HTable table) {
+      super();
+      this.table = table;
+    }
+    public void abort(long lockid) throws IOException {
+      this.table.abort(lockid);
+    }
+    public void commit(long lockid) throws IOException {
+      this.table.commit(lockid);
+    }
+    public void put(long lockid, Text column, byte[] val) throws IOException {
+      this.table.put(lockid, column, val);
+    }
+    public long startBatchUpdate(Text row) {
+      return this.table.startBatchUpdate(row);
+    }
   }
 }

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java?view=diff&rev=566459&r1=566458&r2=566459
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java Wed Aug 15 18:07:51 2007
@@ -365,7 +365,9 @@
     shutdown(this.masterThread, this.regionThreads);
     // Close the file system.  Will complain if files open so helps w/ leaks.
     try {
-      this.cluster.getFileSystem().close();
+      if (this.cluster != null && this.cluster.getFileSystem() != null) {
+        this.cluster.getFileSystem().close();
+      }
     } catch (IOException e) {
       LOG.error("Closing down dfs", e);
     }

Added: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompaction.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompaction.java?view=auto&rev=566459
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompaction.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompaction.java Wed Aug 15 18:07:51 2007
@@ -0,0 +1,101 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Test compactions
+ */
+public class TestCompaction extends HBaseTestCase {
+  static final Log LOG = LogFactory.getLog(TestCompaction.class.getName());
+
+  protected void setUp() throws Exception {
+    super.setUp();
+  }
+  
+  protected void tearDown() throws Exception {
+    super.tearDown();
+  }
+  
+  /**
+   * Run compaction and flushing memcache
+   * @throws Exception
+   */
+  public void testCompaction() throws Exception {
+    HLog hlog = new HLog(this.localFs, this.testDir, this.conf);
+    HTableDescriptor htd = createTableDescriptor(getName());
+    HRegionInfo hri = new HRegionInfo(1, htd, null, null);
+    final HRegion r =
+      new HRegion(testDir, hlog, this.localFs, this.conf, hri, null);
+    try {
+      createStoreFile(r);
+      assertFalse(r.needsCompaction());
+      int compactionThreshold =
+        this.conf.getInt("hbase.hstore.compactionThreshold", 3);
+      for (int i = 0; i < compactionThreshold; i++) {
+        createStoreFile(r);
+      }
+      assertTrue(r.needsCompaction());
+      // Try to run compaction concurrent with a thread flush.
+      addContent(new HRegionLoader(r), COLFAMILY_NAME1);
+      Thread t1 = new Thread() {
+        @Override
+        public void run() {
+          try {
+            r.flushcache(false);
+          } catch (IOException e) {
+            e.printStackTrace();
+          }
+        }
+      };
+      Thread t2 = new Thread() {
+        @Override
+        public void run() {
+          try {
+            assertTrue(r.compactStores());
+          } catch (IOException e) {
+            e.printStackTrace();
+          }
+        }
+      };
+      t1.setDaemon(true);
+      t1.start();
+      t2.setDaemon(true);
+      t2.start();
+      t1.join();
+      t2.join();
+    } finally {
+      r.close();
+      hlog.closeAndDelete();
+    }
+  }
+  
+  private void createStoreFile(final HRegion r) throws IOException {
+    HRegionLoader loader = new HRegionLoader(r);
+    for (int i = 0; i < 3; i++) {
+      addContent(loader, COLFAMILY_NAME1);
+    }
+    r.flushcache(false);
+  }
+}
\ No newline at end of file

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompare.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompare.java?view=diff&rev=566459&r1=566458&r2=566459
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompare.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompare.java Wed Aug 15 18:07:51 2007
@@ -26,7 +26,27 @@
  * Test comparing HBase objects.
  */
 public class TestCompare extends TestCase {
-  /** test case */
+  
+  /**
+   * HStoreKey sorts as you would expect in the row and column portions but
+   * for the timestamps, it sorts in reverse with the newest sorting before
+   * the oldest (This is intentional so we trip over the latest first when
+   * iterating or looking in store files).
+   */
+  public void testHStoreKey() {
+    long timestamp = System.currentTimeMillis();
+    Text a = new Text("a");
+    HStoreKey past = new HStoreKey(a, a, timestamp - 10);
+    HStoreKey now = new HStoreKey(a, a, timestamp);
+    HStoreKey future = new HStoreKey(a, a, timestamp + 10);
+    assertTrue(past.compareTo(now) > 0);
+    assertTrue(now.compareTo(now) == 0);
+    assertTrue(future.compareTo(now) < 0);
+  }
+  
+  /**
+   * Sort of HRegionInfo.
+   */
   public void testHRegionInfo() {
     HRegionInfo a = new HRegionInfo(1, new HTableDescriptor("a"), null, null);
     HRegionInfo b = new HRegionInfo(2, new HTableDescriptor("b"), null, null);