You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/04/03 22:34:30 UTC

svn commit: r525267 [4/5] - in /lucene/hadoop/trunk: ./ src/contrib/hbase/ src/contrib/hbase/src/ src/contrib/hbase/src/java/ src/contrib/hbase/src/java/org/ src/contrib/hbase/src/java/org/apache/ src/contrib/hbase/src/java/org/apache/hadoop/ src/contr...

Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java?view=auto&rev=525267
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java Tue Apr  3 13:34:28 2007
@@ -0,0 +1,976 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.fs.*;
+
+import java.io.*;
+import java.util.*;
+
+/*******************************************************************************
+ * HStore maintains a bunch of data files.  It is responsible for maintaining 
+ * the memory/file hierarchy and for periodic flushes to disk and compacting 
+ * edits to the file.
+ *
+ * Locking and transactions are handled at a higher level.  This API should not 
+ * be called directly by any writer, but rather by an HRegion manager.
+ ******************************************************************************/
+public class HStore {
+  private static final Log LOG = LogFactory.getLog(HStore.class);
+
+  static final String COMPACTION_DIR = "compaction.tmp";
+  static final String WORKING_COMPACTION = "compaction.inprogress";
+  static final String COMPACTION_TO_REPLACE = "toreplace";    
+  static final String COMPACTION_DONE = "done";
+
+  Path dir;
+  Text regionName;
+  Text colFamily;
+  int maxVersions;
+  FileSystem fs;
+  Configuration conf;
+  Path mapdir;
+  Path compactdir;
+  Path loginfodir;
+
+  Integer compactLock = new Integer(0);
+  Integer flushLock = new Integer(0);
+
+  HLocking locking = new HLocking();
+
+  TreeMap<Long, MapFile.Reader> maps = new TreeMap<Long, MapFile.Reader>();
+  TreeMap<Long, HStoreFile> mapFiles = new TreeMap<Long, HStoreFile>();
+
+  Random rand = new Random();
+
+  //////////////////////////////////////////////////////////////////////////////
+  // Constructors, destructors, etc
+  //////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * An HStore is a set of zero or more MapFiles, which stretch backwards over 
+   * time.  A given HStore is responsible for a certain set of columns for a row
+   * in the HRegion.
+   *
+   * The HRegion starts writing to its set of HStores when the HRegion's 
+   * memcache is flushed.  This results in a round of new MapFiles, one for
+   * each HStore.
+   *
+   * There's no reason to consider append-logging at this level; all logging and 
+   * locking is handled at the HRegion level.  HStore just provides services to 
+   * manage sets of MapFiles.  One of the most important of those services is 
+   * MapFile-compaction services.
+   *
+   * The only thing having to do with logs that HStore needs to deal with is
+   * the reconstructionLog.  This is a segment of an HRegion's log that might
+   * be present upon startup.  If the param is NULL, there's nothing to do.
+   * If the param is non-NULL, we need to process the log to reconstruct
+   * a TreeMap that might not have been written to disk before the process died.
+   *
+   * It's assumed that after this constructor returns, the reconstructionLog file
+   * will be deleted (by whoever has instantiated the HStore).
+   */
+  public HStore(Path dir, Text regionName, Text colFamily, int maxVersions, 
+      FileSystem fs, Path reconstructionLog, Configuration conf) throws IOException {
+    
+    this.dir = dir;
+    this.regionName = regionName;
+    this.colFamily = colFamily;
+    this.maxVersions = maxVersions;
+    this.fs = fs;
+    this.conf = conf;
+
+    this.mapdir = HStoreFile.getMapDir(dir, regionName, colFamily);
+    fs.mkdirs(mapdir);
+    this.loginfodir = HStoreFile.getInfoDir(dir, regionName, colFamily);
+    fs.mkdirs(loginfodir);
+
+    LOG.debug("starting HStore for " + regionName + "/"+ colFamily);
+    
+    // Either restart or get rid of any leftover compaction work.  Either way, 
+    // by the time processReadyCompaction() returns, we can get rid of the 
+    // existing compaction-dir.
+
+    this.compactdir = new Path(dir, COMPACTION_DIR);
+    Path curCompactStore = HStoreFile.getHStoreDir(compactdir, regionName, colFamily);
+    if(fs.exists(curCompactStore)) {
+      processReadyCompaction();
+      fs.delete(curCompactStore);
+    }
+
+    // Go through the 'mapdir' and 'loginfodir' together, make sure that all 
+    // MapFiles are in a reliable state.  Every entry in 'mapdir' must have a 
+    // corresponding one in 'loginfodir'. Without a corresponding log info file,
+    // the entry in 'mapdir'must be deleted.
+
+    Vector<HStoreFile> hstoreFiles 
+      = HStoreFile.loadHStoreFiles(conf, dir, regionName, colFamily, fs);
+    
+    for(Iterator<HStoreFile> it = hstoreFiles.iterator(); it.hasNext(); ) {
+      HStoreFile hsf = it.next();
+      mapFiles.put(hsf.loadInfo(fs), hsf);
+    }
+
+    // Now go through all the HSTORE_LOGINFOFILEs and figure out the most-recent
+    // log-seq-ID that's present.  The most-recent such ID means we can ignore 
+    // all log messages up to and including that ID (because they're already 
+    // reflected in the TreeMaps).
+    //
+    // If the HSTORE_LOGINFOFILE doesn't contain a number, just ignore it.  That
+    // means it was built prior to the previous run of HStore, and so it cannot 
+    // contain any updates also contained in the log.
+
+    long maxSeqID = -1;
+    for(Iterator<HStoreFile> it = hstoreFiles.iterator(); it.hasNext(); ) {
+      HStoreFile hsf = it.next();
+      long seqid = hsf.loadInfo(fs);
+      if(seqid > 0) {
+        if(seqid > maxSeqID) {
+          maxSeqID = seqid;
+        }
+      }
+    }
+
+    // Read the reconstructionLog to see whether we need to build a brand-new 
+    // MapFile out of non-flushed log entries.  
+    //
+    // We can ignore any log message that has a sequence ID that's equal to or 
+    // lower than maxSeqID.  (Because we know such log messages are already 
+    // reflected in the MapFiles.)
+
+    LOG.debug("reading reconstructionLog");
+    
+    if(reconstructionLog != null && fs.exists(reconstructionLog)) {
+      long maxSeqIdInLog = -1;
+      TreeMap<HStoreKey, BytesWritable> reconstructedCache 
+        = new TreeMap<HStoreKey, BytesWritable>();
+      
+      SequenceFile.Reader login 
+        = new SequenceFile.Reader(fs, reconstructionLog, conf);
+      
+      try {
+        HLogKey key = new HLogKey();
+        HLogEdit val = new HLogEdit();
+        while(login.next(key, val)) {
+          maxSeqIdInLog = Math.max(maxSeqIdInLog, key.getLogSeqNum());
+          if(key.getLogSeqNum() <= maxSeqID) {
+            continue;
+          }
+          reconstructedCache.put(new HStoreKey(key.getRow(), val.getColumn(), 
+              val.getTimestamp()), val.getVal());
+        }
+        
+      } finally {
+        login.close();
+      }
+
+      if(reconstructedCache.size() > 0) {
+        
+        // We create a "virtual flush" at maxSeqIdInLog+1.
+        
+        LOG.debug("flushing reconstructionCache");
+        
+        flushCacheHelper(reconstructedCache, maxSeqIdInLog+1, true);
+      }
+    }
+
+    // Compact all the MapFiles into a single file.  The resulting MapFile 
+    // should be "timeless"; that is, it should not have an associated seq-ID, 
+    // because all log messages have been reflected in the TreeMaps at this point.
+    
+    if(mapFiles.size() >= 1) {
+      compactHelper(true);
+    }
+
+    // Finally, start up all the map readers! (There should be just one at this 
+    // point, as we've compacted them all.)
+
+    LOG.debug("starting map readers");
+    
+    for(Iterator<Long> it = mapFiles.keySet().iterator(); it.hasNext(); ) {
+      Long key = it.next().longValue();
+      HStoreFile hsf = mapFiles.get(key);
+
+      //TODO - is this really necessary?  Don't I do this inside compact()?
+      
+      maps.put(key, new MapFile.Reader(fs, hsf.getMapFilePath().toString(), conf));
+    }
+    
+    LOG.info("HStore online for " + this.regionName + "/" + this.colFamily);
+  }
+
+  /** Turn off all the MapFile readers */
+  public void close() throws IOException {
+    locking.obtainWriteLock();
+    LOG.info("closing HStore for " + this.regionName + "/" + this.colFamily);
+    
+    try {
+      for(Iterator<MapFile.Reader> it = maps.values().iterator(); it.hasNext(); ) {
+        MapFile.Reader map = it.next();
+        map.close();
+      }
+      maps.clear();
+      mapFiles.clear();
+      
+      LOG.info("HStore closed for " + this.regionName + "/" + this.colFamily);
+      
+    } finally {
+      locking.releaseWriteLock();
+    }
+  }
+
+
+  //////////////////////////////////////////////////////////////////////////////
+  // Flush changes to disk
+  //////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Write out a brand-new set of items to the disk.
+   *
+   * We should only store key/vals that are appropriate for the data-columns 
+   * stored in this HStore.
+   *
+   * Also, we are not expecting any reads of this MapFile just yet.
+   *
+   * Return the entire list of HStoreFiles currently used by the HStore.
+   */
+  public Vector<HStoreFile> flushCache(TreeMap<HStoreKey, BytesWritable> inputCache,
+      long logCacheFlushId) throws IOException {
+    
+    return flushCacheHelper(inputCache, logCacheFlushId, true);
+  }
+  
+  Vector<HStoreFile> flushCacheHelper(TreeMap<HStoreKey, BytesWritable> inputCache,
+      long logCacheFlushId, boolean addToAvailableMaps) throws IOException {
+    
+    synchronized(flushLock) {
+      LOG.debug("flushing HStore " + this.regionName + "/" + this.colFamily);
+      
+      // A. Write the TreeMap out to the disk
+
+      HStoreFile flushedFile 
+        = HStoreFile.obtainNewHStoreFile(conf, dir, regionName, colFamily, fs);
+      
+      Path mapfile = flushedFile.getMapFilePath();
+      MapFile.Writer out = new MapFile.Writer(conf, fs, mapfile.toString(), 
+          HStoreKey.class, BytesWritable.class);
+      
+      try {
+        for(Iterator<HStoreKey> it = inputCache.keySet().iterator(); it.hasNext(); ) {
+          HStoreKey curkey = it.next();
+          if(this.colFamily.equals(HStoreKey.extractFamily(curkey.getColumn()))) {
+            BytesWritable val = inputCache.get(curkey);
+            out.append(curkey, val);
+          }
+        }
+        LOG.debug("HStore " + this.regionName + "/" + this.colFamily + " flushed");
+        
+      } finally {
+        out.close();
+      }
+
+      // B. Write out the log sequence number that corresponds to this output
+      // MapFile.  The MapFile is current up to and including the log seq num.
+
+      LOG.debug("writing log cache flush id");
+      flushedFile.writeInfo(fs, logCacheFlushId);
+
+      // C. Finally, make the new MapFile available.
+
+      if(addToAvailableMaps) {
+        locking.obtainWriteLock();
+        
+        try {
+          maps.put(logCacheFlushId, new MapFile.Reader(fs, mapfile.toString(), conf));
+          mapFiles.put(logCacheFlushId, flushedFile);
+          LOG.debug("HStore available for " + this.regionName + "/" + this.colFamily);
+        
+        } finally {
+          locking.releaseWriteLock();
+        }
+      }
+      return getAllMapFiles();
+    }
+  }
+
+  public Vector<HStoreFile> getAllMapFiles() {
+    Vector<HStoreFile> flushedFiles = new Vector<HStoreFile>();
+    for(Iterator<HStoreFile> it = mapFiles.values().iterator(); it.hasNext(); ) {
+      HStoreFile hsf = it.next();
+      flushedFiles.add(hsf);
+    }
+    return flushedFiles;
+  }
+
+  //////////////////////////////////////////////////////////////////////////////
+  // Compaction
+  //////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Compact the back-HStores.  This method may take some time, so the calling 
+   * thread must be able to block for long periods.
+   * 
+   * During this time, the HStore can work as usual, getting values from MapFiles
+   * and writing new MapFiles from given memcaches.
+   * 
+   * Existing MapFiles are not destroyed until the new compacted TreeMap is 
+   * completely written-out to disk.
+   *
+   * The compactLock block prevents multiple simultaneous compactions.
+   * The structureLock prevents us from interfering with other write operations.
+   * 
+   * We don't want to hold the structureLock for the whole time, as a compact() 
+   * can be lengthy and we want to allow cache-flushes during this period.
+   */
+  public void compact() throws IOException {
+    compactHelper(false);
+  }
+  
+  void compactHelper(boolean deleteSequenceInfo) throws IOException {
+    synchronized(compactLock) {
+      LOG.debug("started compaction of " + this.regionName + "/" + this.colFamily);
+      
+      Path curCompactStore = HStoreFile.getHStoreDir(compactdir, regionName, colFamily);
+      fs.mkdirs(curCompactStore);
+      
+      try {
+        
+        // Grab a list of files to compact.
+        
+        Vector<HStoreFile> toCompactFiles = null;
+        locking.obtainWriteLock();
+        try {
+          toCompactFiles = new Vector<HStoreFile>(mapFiles.values());
+          
+        } finally {
+          locking.releaseWriteLock();
+        }
+
+        // Compute the max-sequenceID seen in any of the to-be-compacted TreeMaps
+
+        long maxSeenSeqID = -1;
+        for(Iterator<HStoreFile> it = toCompactFiles.iterator(); it.hasNext(); ) {
+          HStoreFile hsf = it.next();
+          long seqid = hsf.loadInfo(fs);
+          if(seqid > 0) {
+            if(seqid > maxSeenSeqID) {
+              maxSeenSeqID = seqid;
+            }
+          }
+        }
+        LOG.debug("max sequence id =" + maxSeenSeqID);
+        
+        HStoreFile compactedOutputFile 
+          = new HStoreFile(conf, compactdir, regionName, colFamily, -1);
+        
+        if(toCompactFiles.size() == 1) {
+          LOG.debug("nothing to compact for " + this.regionName + "/" + this.colFamily);
+          
+          HStoreFile hsf = toCompactFiles.elementAt(0);
+          if(hsf.loadInfo(fs) == -1) {
+            return;
+          }
+        }
+
+        // Step through them, writing to the brand-new TreeMap
+
+        MapFile.Writer compactedOut = new MapFile.Writer(conf, fs, 
+            compactedOutputFile.getMapFilePath().toString(), HStoreKey.class, 
+            BytesWritable.class);
+        
+        try {
+
+          // We create a new set of MapFile.Reader objects so we don't screw up 
+          // the caching associated with the currently-loaded ones.
+          //
+          // Our iteration-based access pattern is practically designed to ruin 
+          // the cache.
+          //
+          // We work by opening a single MapFile.Reader for each file, and 
+          // iterating through them in parallel.  We always increment the 
+          // lowest-ranked one.  Updates to a single row/column will appear 
+          // ranked by timestamp.  This allows us to throw out deleted values or
+          // obsolete versions.
+
+          MapFile.Reader[] readers = new MapFile.Reader[toCompactFiles.size()];
+          HStoreKey[] keys = new HStoreKey[toCompactFiles.size()];
+          BytesWritable[] vals = new BytesWritable[toCompactFiles.size()];
+          boolean[] done = new boolean[toCompactFiles.size()];
+          int pos = 0;
+          for(Iterator<HStoreFile> it = toCompactFiles.iterator(); it.hasNext(); ) {
+            HStoreFile hsf = it.next();
+            readers[pos] = new MapFile.Reader(fs, hsf.getMapFilePath().toString(), conf);
+            keys[pos] = new HStoreKey();
+            vals[pos] = new BytesWritable();
+            done[pos] = false;
+            pos++;
+          }
+
+          // Now, advance through the readers in order.  This will have the
+          // effect of a run-time sort of the entire dataset.
+
+          LOG.debug("processing HStoreFile readers");
+          
+          int numDone = 0;
+          for(int i = 0; i < readers.length; i++) {
+            readers[i].reset();
+            done[i] = ! readers[i].next(keys[i], vals[i]);
+            if(done[i]) {
+              numDone++;
+            }
+          }
+          
+          int timesSeen = 0;
+          Text lastRow = new Text();
+          Text lastColumn = new Text();
+          while(numDone < done.length) {
+
+            // Find the reader with the smallest key
+
+            int smallestKey = -1;
+            for(int i = 0; i < readers.length; i++) {
+              if(done[i]) {
+                continue;
+              }
+              
+              if(smallestKey < 0) {
+                smallestKey = i;
+              
+              } else {
+                if(keys[i].compareTo(keys[smallestKey]) < 0) {
+                  smallestKey = i;
+                }
+              }
+            }
+
+            // Reflect the current key/val in the output
+
+            HStoreKey sk = keys[smallestKey];
+            if(lastRow.equals(sk.getRow())
+                && lastColumn.equals(sk.getColumn())) {
+              
+              timesSeen++;
+              
+            } else {
+              timesSeen = 1;
+            }
+            
+            if(timesSeen <= maxVersions) {
+
+              // Keep old versions until we have maxVersions worth.
+              // Then just skip them.
+
+              if(sk.getRow().getLength() != 0
+                  && sk.getColumn().getLength() != 0) {
+                
+                // Only write out objects which have a non-zero length key and value
+
+                compactedOut.append(sk, vals[smallestKey]);
+              }
+              
+            }
+
+            //TODO: I don't know what to do about deleted values.  I currently 
+            // include the fact that the item was deleted as a legitimate 
+            // "version" of the data.  Maybe it should just drop the deleted val?
+
+            // Update last-seen items
+
+            lastRow.set(sk.getRow());
+            lastColumn.set(sk.getColumn());
+
+            // Advance the smallest key.  If that reader's all finished, then 
+            // mark it as done.
+
+            if(! readers[smallestKey].next(keys[smallestKey], vals[smallestKey])) {
+              done[smallestKey] = true;
+              readers[smallestKey].close();
+              numDone++;
+            }
+          }
+          
+          LOG.debug("all HStores processed");
+          
+        } finally {
+          compactedOut.close();
+        }
+
+        LOG.debug("writing new compacted HStore");
+
+        // Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap.
+
+        if((! deleteSequenceInfo) && maxSeenSeqID >= 0) {
+          compactedOutputFile.writeInfo(fs, maxSeenSeqID);
+          
+        } else {
+          compactedOutputFile.writeInfo(fs, -1);
+        }
+
+        // Write out a list of data files that we're replacing
+
+        Path filesToReplace = new Path(curCompactStore, COMPACTION_TO_REPLACE);
+        DataOutputStream out = new DataOutputStream(fs.create(filesToReplace));
+        try {
+          out.writeInt(toCompactFiles.size());
+          for(Iterator<HStoreFile> it = toCompactFiles.iterator(); it.hasNext(); ) {
+            HStoreFile hsf = it.next();
+            hsf.write(out);
+          }
+          
+        } finally {
+          out.close();
+        }
+
+        // Indicate that we're done.
+
+        Path doneFile = new Path(curCompactStore, COMPACTION_DONE);
+        out = new DataOutputStream(fs.create(doneFile));
+        
+        try {
+        } finally {
+          out.close();
+        }
+
+        // Move the compaction into place.
+
+        processReadyCompaction();
+        
+        LOG.debug("compaction complete for " + this.regionName + "/" + this.colFamily);
+
+      } finally {
+        fs.delete(compactdir);
+      }
+    }
+  }
+
+  /**
+   * It's assumed that the compactLock  will be acquired prior to calling this 
+   * method!  Otherwise, it is not thread-safe!
+   *
+   * It works by processing a compaction that's been written to disk.
+   * 
+   * It is usually invoked at the end of a compaction, but might also be invoked
+   * at HStore startup, if the prior execution died midway through.
+   */
+  void processReadyCompaction() throws IOException {
+
+    // Move the compacted TreeMap into place.
+    // That means:
+    // 1) Acquiring the write-lock
+    // 2) Figuring out what MapFiles are going to be replaced
+    // 3) Unloading all the replaced MapFiles.
+    // 4) Deleting all the old MapFile files.
+    // 5) Moving the new MapFile into place
+    // 6) Loading the new TreeMap.
+    // 7) Releasing the write-lock
+
+    // 1. Acquiring the write-lock
+
+    locking.obtainWriteLock();
+    Path curCompactStore = HStoreFile.getHStoreDir(compactdir, regionName, colFamily);
+    try {
+      Path doneFile = new Path(curCompactStore, COMPACTION_DONE);
+      if(! fs.exists(doneFile)) {
+        
+        // The last execution didn't finish the compaction, so there's nothing 
+        // we can do.  We'll just have to redo it. Abandon it and return.
+        
+        return;
+      }
+
+      // OK, there's actually compaction work that needs to be put into place.
+
+      LOG.debug("compaction starting");
+      
+      // 2. Load in the files to be deleted.
+      //    (Figuring out what MapFiles are going to be replaced)
+      
+      Vector<HStoreFile> toCompactFiles = new Vector<HStoreFile>();
+      Path filesToReplace = new Path(curCompactStore, COMPACTION_TO_REPLACE);
+      DataInputStream in = new DataInputStream(fs.open(filesToReplace));
+      try {
+        int numfiles = in.readInt();
+        for(int i = 0; i < numfiles; i++) {
+          HStoreFile hsf = new HStoreFile(conf);
+          hsf.readFields(in);
+          toCompactFiles.add(hsf);
+        }
+        
+      } finally {
+        in.close();
+      }
+
+      LOG.debug("loaded files to be deleted");
+      
+      // 3. Unload all the replaced MapFiles.
+      
+      Iterator<HStoreFile> it2 = mapFiles.values().iterator();
+      for(Iterator<MapFile.Reader> it = maps.values().iterator(); it.hasNext(); ) {
+        MapFile.Reader curReader = it.next();
+        HStoreFile curMapFile = it2.next();
+        if(toCompactFiles.contains(curMapFile)) {
+          curReader.close();
+          it.remove();
+        }
+      }
+      
+      for(Iterator<HStoreFile> it = mapFiles.values().iterator(); it.hasNext(); ) {
+        HStoreFile curMapFile = it.next();
+        if(toCompactFiles.contains(curMapFile)) {
+          it.remove();
+        }
+      }
+
+      LOG.debug("unloaded existing MapFiles");
+      
+      // What if we crash at this point?  No big deal; we will restart
+      // processReadyCompaction(), and nothing has been lost.
+
+      // 4. Delete all the old files, no longer needed
+      
+      for(Iterator<HStoreFile> it = toCompactFiles.iterator(); it.hasNext(); ) {
+        HStoreFile hsf = it.next();
+        fs.delete(hsf.getMapFilePath());
+        fs.delete(hsf.getInfoFilePath());
+      }
+
+      LOG.debug("old files deleted");
+      
+      // What if we fail now?  The above deletes will fail silently. We'd better
+      // make sure not to write out any new files with the same names as 
+      // something we delete, though.
+
+      // 5. Moving the new MapFile into place
+      
+      LOG.debug("moving new MapFile into place");
+      
+      HStoreFile compactedFile 
+        = new HStoreFile(conf, compactdir, regionName, colFamily, -1);
+      
+      HStoreFile finalCompactedFile 
+        = HStoreFile.obtainNewHStoreFile(conf, dir, regionName, colFamily, fs);
+      
+      fs.rename(compactedFile.getMapFilePath(), finalCompactedFile.getMapFilePath());
+      
+      // Fail here?  No problem.
+      
+      fs.rename(compactedFile.getInfoFilePath(), finalCompactedFile.getInfoFilePath());
+
+      // Fail here?  No worries.
+      
+      long orderVal = finalCompactedFile.loadInfo(fs);
+
+      // 6. Loading the new TreeMap.
+      
+      LOG.debug("loading new TreeMap");
+      
+      mapFiles.put(orderVal, finalCompactedFile);
+      maps.put(orderVal, new MapFile.Reader(fs, 
+          finalCompactedFile.getMapFilePath().toString(), conf));
+      
+    } finally {
+      
+      // 7. Releasing the write-lock
+      
+      locking.releaseWriteLock();
+    }
+  }
+
+  //////////////////////////////////////////////////////////////////////////////
+  // Accessors.  
+  // (This is the only section that is directly useful!)
+  //////////////////////////////////////////////////////////////////////////////
+  
+  /**
+   * Return all the available columns for the given key.  The key indicates a 
+   * row and timestamp, but not a column name.
+   *
+   * The returned object should map column names to byte arrays (byte[]).
+   */
+  public void getFull(HStoreKey key, TreeMap<Text, byte[]> results) throws IOException {
+    locking.obtainReadLock();
+    try {
+      MapFile.Reader[] maparray 
+        = maps.values().toArray(new MapFile.Reader[maps.size()]);
+      
+      for(int i = maparray.length-1; i >= 0; i--) {
+        MapFile.Reader map = maparray[i];
+
+        synchronized(map) {
+          BytesWritable readval = new BytesWritable();
+          map.reset();
+          HStoreKey readkey = (HStoreKey)map.getClosest(key, readval);
+          
+          do {
+            Text readcol = readkey.getColumn();
+            if(results.get(readcol) == null
+                && key.matchesWithoutColumn(readkey)) {
+              results.put(new Text(readcol), readval.get());
+              readval = new BytesWritable();
+              
+            } else if(key.getRow().compareTo(readkey.getRow()) > 0) {
+              break;
+            }
+            
+          } while(map.next(readkey, readval));
+        }
+      }
+      
+    } finally {
+      locking.releaseReadLock();
+    }
+  }
+
+  /**
+   * Get the value for the indicated HStoreKey.  Grab the target value and the 
+   * previous 'numVersions-1' values, as well.
+   *
+   * If 'numVersions' is negative, the method returns all available versions.
+   */
+  public byte[][] get(HStoreKey key, int numVersions) throws IOException {
+    if(numVersions == 0) {
+      throw new IllegalArgumentException("Must request at least one value.");
+    }
+    
+    Vector<byte[]> results = new Vector<byte[]>();
+    locking.obtainReadLock();
+    try {
+      MapFile.Reader[] maparray 
+        = maps.values().toArray(new MapFile.Reader[maps.size()]);
+      
+      for(int i = maparray.length-1; i >= 0; i--) {
+        MapFile.Reader map = maparray[i];
+
+        synchronized(map) {
+          BytesWritable readval = new BytesWritable();
+          map.reset();
+          HStoreKey readkey = (HStoreKey)map.getClosest(key, readval);
+          
+          if(readkey.matchesRowCol(key)) {
+            results.add(readval.get());
+            readval = new BytesWritable();
+
+            while(map.next(readkey, readval) && readkey.matchesRowCol(key)) {
+              if(numVersions > 0 && (results.size() >= numVersions)) {
+                break;
+                
+              } else {
+                results.add(readval.get());
+                readval = new BytesWritable();
+              }
+            }
+          }
+        }
+        if(results.size() >= numVersions) {
+          break;
+        }
+      }
+
+      if(results.size() == 0) {
+        return null;
+        
+      } else {
+        return (byte[][]) results.toArray(new byte[results.size()][]);
+      }
+      
+    } finally {
+      locking.releaseReadLock();
+    }
+  }
+
+  /**
+   * Gets the size of the largest MapFile and its mid key.
+   * 
+   * @param midKey      - the middle key for the largest MapFile
+   * @return            - size of the largest MapFile
+   * @throws IOException
+   */
+  public long getLargestFileSize(Text midKey) throws IOException {
+    long maxSize = 0L;
+    long mapIndex = 0L;
+
+    // Iterate through all the MapFiles
+    
+    for(Iterator<Map.Entry<Long, HStoreFile>> it = mapFiles.entrySet().iterator();
+        it.hasNext(); ) {
+      
+      Map.Entry<Long, HStoreFile> e = it.next();
+      HStoreFile curHSF = e.getValue();
+      long size = fs.getLength(new Path(curHSF.getMapFilePath(), MapFile.DATA_FILE_NAME));
+      
+      if(size > maxSize) {              // This is the largest one so far
+        maxSize = size;
+        mapIndex = e.getKey();
+      }
+    }
+
+    MapFile.Reader r = maps.get(mapIndex);
+    
+    synchronized(r) {
+      midKey.set(((HStoreKey)r.midKey()).getRow());
+    }
+    
+    return maxSize;
+  }
+  
+  //////////////////////////////////////////////////////////////////////////////
+  // File administration
+  //////////////////////////////////////////////////////////////////////////////
+
+  /** Generate a random unique filename suffix */
+  String obtainFileLabel(Path prefix) throws IOException {
+    String testsuffix = String.valueOf(Math.abs(rand.nextInt()));
+    Path testpath = new Path(prefix.toString() + testsuffix);
+    while(fs.exists(testpath)) {
+      testsuffix = String.valueOf(Math.abs(rand.nextInt()));
+      testpath = new Path(prefix.toString() + testsuffix);
+    }
+    return testsuffix;
+  }
+
+  /**
+   * Return a set of MapFile.Readers, one for each HStore file.
+   * These should be closed after the user is done with them.
+   */
+  public HScannerInterface getScanner(long timestamp, Text targetCols[],
+      Text firstRow) throws IOException {
+    
+    return new HStoreScanner(timestamp, targetCols, firstRow);
+  }
+
+  //////////////////////////////////////////////////////////////////////////////
+  // This class implements the HScannerInterface.
+  // It lets the caller scan the contents of this HStore.
+  //////////////////////////////////////////////////////////////////////////////
+  
+  class HStoreScanner extends HAbstractScanner {
+    MapFile.Reader readers[];
+    Text lastRow = null;
+    
+    public HStoreScanner(long timestamp, Text targetCols[], Text firstRow) throws IOException {
+      super(timestamp, targetCols);
+
+      locking.obtainReadLock();
+      try {
+        this.readers = new MapFile.Reader[mapFiles.size()];
+        int i = 0;
+        for(Iterator<HStoreFile> it = mapFiles.values().iterator(); it.hasNext(); ) {
+          HStoreFile curHSF = it.next();
+          readers[i++] = new MapFile.Reader(fs, curHSF.getMapFilePath().toString(), conf);
+        }
+        
+        this.keys = new HStoreKey[readers.length];
+        this.vals = new BytesWritable[readers.length];
+
+        // Advance the readers to the first pos.
+
+        for(i = 0; i < readers.length; i++) {
+          keys[i] = new HStoreKey();
+          vals[i] = new BytesWritable();
+
+          if(firstRow.getLength() != 0) {
+            if(findFirstRow(i, firstRow)) {
+              continue;
+            }
+          }
+          
+          while(getNext(i)) {
+            if(columnMatch(i)) {
+              break;
+            }
+          }
+        }
+        
+      } catch (Exception ex) {
+        close();
+      }
+    }
+
+    /**
+     * The user didn't want to start scanning at the first row. This method
+     * seeks to the requested row.
+     *
+     * @param i         - which iterator to advance
+     * @param firstRow  - seek to this row
+     * @return          - true if this is the first row or if the row was not found
+     */
+    boolean findFirstRow(int i, Text firstRow) throws IOException {
+      HStoreKey firstKey
+        = (HStoreKey)readers[i].getClosest(new HStoreKey(firstRow), vals[i]);
+      
+      if(firstKey == null) {
+        
+        // Didn't find it. Close the scanner and return TRUE
+        
+        closeSubScanner(i);
+        return true;
+      }
+      keys[i].setRow(firstKey.getRow());
+      keys[i].setColumn(firstKey.getColumn());
+      keys[i].setVersion(firstKey.getTimestamp());
+      return columnMatch(i);
+    }
+    
+    /**
+     * Get the next value from the specified reader.
+     * 
+     * @param i - which reader to fetch next value from
+     * @return - true if there is more data available
+     */
+    boolean getNext(int i) throws IOException {
+      if(! readers[i].next(keys[i], vals[i])) {
+        closeSubScanner(i);
+        return false;
+      }
+      return true;
+    }
+    
+    /** Close down the indicated reader. */
+    void closeSubScanner(int i) throws IOException {
+      try {
+        if(readers[i] != null) {
+          readers[i].close();
+        }
+        
+      } finally {
+        readers[i] = null;
+        keys[i] = null;
+        vals[i] = null;
+      }
+    }
+
+    /** Shut it down! */
+    public void close() throws IOException {
+      if(! scannerClosed) {
+        try {
+          for(int i = 0; i < readers.length; i++) {
+            if(readers[i] != null) {
+              readers[i].close();
+            }
+          }
+          
+        } finally {
+          locking.releaseReadLock();
+          scannerClosed = true;
+        }
+      }
+    }
+  }
+}

Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreFile.java?view=auto&rev=525267
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreFile.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreFile.java Tue Apr  3 13:34:28 2007
@@ -0,0 +1,378 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.conf.*;
+
+import java.io.*;
+import java.util.*;
+
+/*******************************************************************************
+ * Each HStore maintains a bunch of different data files.
+ *
+ * The filename is a mix of the parent dir, the region name, the column name, 
+ * and the file identifier.
+ * 
+ * This class handles all that path-building stuff for you.
+ ******************************************************************************/
+public class HStoreFile implements HConstants, WritableComparable {
+  public static final byte INFO_SEQ_NUM = 0;
+  public static final String HSTORE_DATFILE_PREFIX = "mapfile.dat.";
+  public static final String HSTORE_INFOFILE_PREFIX = "mapfile.info.";
+  public static final String HSTORE_DATFILE_DIR = "mapfiles";
+  public static final String HSTORE_INFO_DIR = "info";
+  static Random rand = new Random();
+
+  Path dir;
+  Text regionName;
+  Text colFamily;
+  long fileId;
+  Configuration conf;
+
+  /**
+   * An HStoreFile tracks 4 things: its parent dir, the region identifier, the 
+   * column family, and the file identifier.  If you know those four things, you
+   * know how to obtain the right HStoreFile.
+   *
+   * When merging or splitting HRegions, we might want to modify one of the 
+   * params for an HStoreFile (effectively moving it elsewhere).
+   */
+  public HStoreFile(Configuration conf) {
+    this.conf = conf;
+    this.dir = new Path("");
+    this.regionName = new Text();
+    this.colFamily = new Text();
+    this.fileId = 0;
+  }
+  
+  public HStoreFile(Configuration conf, Path dir, Text regionName, 
+      Text colFamily, long fileId) {
+    
+    this.conf = conf;
+    this.dir = dir;
+    this.regionName = new Text(regionName);
+    this.colFamily = new Text(colFamily);
+    this.fileId = fileId;
+  }
+
+  // Get the individual components
+  
+  public Path getDir() {
+    return dir;
+  }
+  
+  public Text getRegionName() {
+    return regionName;
+  }
+  
+  public Text getColFamily() {
+    return colFamily;
+  }
+  
+  public long fileId() {
+    return fileId;
+  }
+
+  // Build full filenames from those components
+  
+  public Path getMapFilePath() {
+    return new Path(HStoreFile.getMapDir(dir, regionName, colFamily), 
+        HSTORE_DATFILE_PREFIX + fileId);
+  }
+  
+  public Path getInfoFilePath() {
+    return new Path(HStoreFile.getInfoDir(dir, regionName, colFamily), 
+        HSTORE_INFOFILE_PREFIX + fileId);
+  }
+
+  // Static methods to build partial paths to internal directories.  Useful for 
+  // HStore construction and log-rebuilding.
+  
+  public static Path getMapDir(Path dir, Text regionName, Text colFamily) {
+    return new Path(dir, new Path(HREGIONDIR_PREFIX + regionName, 
+        new Path(colFamily.toString(), HSTORE_DATFILE_DIR)));
+  }
+
+  public static Path getInfoDir(Path dir, Text regionName, Text colFamily) {
+    return new Path(dir, new Path(HREGIONDIR_PREFIX + regionName, 
+        new Path(colFamily.toString(), HSTORE_INFO_DIR)));
+  }
+
+  public static Path getHStoreDir(Path dir, Text regionName, Text colFamily) {
+    return new Path(dir, new Path(HREGIONDIR_PREFIX + regionName, 
+        colFamily.toString()));
+  }
+
+  public static Path getHRegionDir(Path dir, Text regionName) {
+    return new Path(dir, new Path(HREGIONDIR_PREFIX + regionName));
+  }
+
+  /**
+   * Obtain a brand-new randomly-named HStoreFile.  Checks the existing
+   * filesystem if the file already exists.
+   */
+  static HStoreFile obtainNewHStoreFile(Configuration conf, Path dir, 
+      Text regionName, Text colFamily, FileSystem fs) throws IOException {
+    
+    Path mapdir = HStoreFile.getMapDir(dir, regionName, colFamily);
+    long fileId = Math.abs(rand.nextLong());
+
+    Path testpath1 = new Path(mapdir, HSTORE_DATFILE_PREFIX + fileId);
+    Path testpath2 = new Path(mapdir, HSTORE_INFOFILE_PREFIX + fileId);
+    while(fs.exists(testpath1) || fs.exists(testpath2)) {
+      fileId = Math.abs(rand.nextLong());
+      testpath1 = new Path(mapdir, HSTORE_DATFILE_PREFIX + fileId);
+      testpath2 = new Path(mapdir, HSTORE_INFOFILE_PREFIX + fileId);
+    }
+    return new HStoreFile(conf, dir, regionName, colFamily, fileId);
+  }
+
+  /**
+   * Create a series of HStoreFiles loaded from the given directory.
+   * 
+   * There must be a matching 'mapdir' and 'loginfo' pair of files.
+   * If only one exists, we'll delete it.
+   */
+  static Vector<HStoreFile> loadHStoreFiles(Configuration conf, Path dir, 
+      Text regionName, Text colFamily, FileSystem fs) throws IOException {
+    
+    Vector<HStoreFile> results = new Vector<HStoreFile>();
+    Path mapdir = HStoreFile.getMapDir(dir, regionName, colFamily);
+
+    Path datfiles[] = fs.listPaths(mapdir);
+    for(int i = 0; i < datfiles.length; i++) {
+      String name = datfiles[i].getName();
+      
+      if(name.startsWith(HSTORE_DATFILE_PREFIX)) {
+        Long fileId = Long.parseLong(name.substring(HSTORE_DATFILE_PREFIX.length()));
+        HStoreFile curfile = new HStoreFile(conf, dir, regionName, colFamily, fileId);
+        Path mapfile = curfile.getMapFilePath();
+        Path infofile = curfile.getInfoFilePath();
+        
+        if(fs.exists(infofile)) {
+          results.add(curfile);
+          
+        } else {
+          fs.delete(mapfile);
+        }
+      }
+    }
+
+    Path infodir = HStoreFile.getInfoDir(dir, regionName, colFamily);
+    Path infofiles[] = fs.listPaths(infodir);
+    for(int i = 0; i < infofiles.length; i++) {
+      String name = infofiles[i].getName();
+      
+      if(name.startsWith(HSTORE_INFOFILE_PREFIX)) {
+        long fileId = Long.parseLong(name.substring(HSTORE_INFOFILE_PREFIX.length()));
+        HStoreFile curfile = new HStoreFile(conf, dir, regionName, colFamily, fileId);
+        Path mapfile = curfile.getMapFilePath();
+        
+        if(! fs.exists(mapfile)) {
+          fs.delete(curfile.getInfoFilePath());
+        }
+      }
+    }
+    return results;
+  }
+
+  //////////////////////////////////////////////////////////////////////////////
+  // File handling
+  //////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Break this HStoreFile file into two new parts, which live in different 
+   * brand-new HRegions.
+   */
+  public void splitStoreFile(Text midKey, HStoreFile dstA, HStoreFile dstB,
+      FileSystem fs, Configuration conf) throws IOException {
+
+    // Copy the appropriate tuples to one MapFile or the other.
+
+    MapFile.Reader in = new MapFile.Reader(fs, getMapFilePath().toString(), conf);
+    try {
+      MapFile.Writer outA = new MapFile.Writer(conf, fs, 
+          dstA.getMapFilePath().toString(), HStoreKey.class, BytesWritable.class);
+      
+      try {
+        MapFile.Writer outB = new MapFile.Writer(conf, fs, 
+            dstB.getMapFilePath().toString(), HStoreKey.class, BytesWritable.class);
+        
+        try {
+          HStoreKey readkey = new HStoreKey();
+          BytesWritable readval = new BytesWritable();
+          
+          while(in.next(readkey, readval)) {
+            Text key = readkey.getRow();
+            
+            if(key.compareTo(midKey) < 0) {
+              outA.append(readkey, readval);
+              
+            } else {
+              outB.append(readkey, readval);
+            }
+          }
+          
+        } finally {
+          outB.close();
+        }
+        
+      } finally {
+        outA.close();
+      }
+      
+    } finally {
+      in.close();
+    }
+
+    // Build an InfoFile for each output
+
+    long seqid = loadInfo(fs);
+    dstA.writeInfo(fs, seqid);
+    dstB.writeInfo(fs, seqid);
+  }
+
+  /**
+   * Write to this HStoreFile with all the contents of the given source HStoreFiles.
+   * We are merging multiple regions into a single new one.
+   */
+  public void mergeStoreFiles(Vector<HStoreFile> srcFiles, FileSystem fs, 
+      Configuration conf) throws IOException {
+
+    // Copy all the source MapFile tuples into this HSF's MapFile
+
+    MapFile.Writer out = new MapFile.Writer(conf, fs, getMapFilePath().toString(),
+        HStoreKey.class, BytesWritable.class);
+    
+    try {
+      for(Iterator<HStoreFile> it = srcFiles.iterator(); it.hasNext(); ) {
+        HStoreFile src = it.next();
+        MapFile.Reader in = new MapFile.Reader(fs, src.getMapFilePath().toString(), conf);
+        
+        try {
+          HStoreKey readkey = new HStoreKey();
+          BytesWritable readval = new BytesWritable();
+          while(in.next(readkey, readval)) {
+            out.append(readkey, readval);
+          }
+          
+        } finally {
+          in.close();
+        }
+      }
+      
+    } finally {
+      out.close();
+    }
+
+    // Build a unified InfoFile from the source InfoFiles.
+
+    long unifiedSeqId = -1;
+    for(Iterator<HStoreFile> it = srcFiles.iterator(); it.hasNext(); ) {
+      HStoreFile hsf = it.next();
+      long curSeqId = hsf.loadInfo(fs);
+      
+      if(curSeqId > unifiedSeqId) {
+        unifiedSeqId = curSeqId;
+      }
+    }
+    writeInfo(fs, unifiedSeqId);
+  }
+
+  /** Read in an info file, give it a unique ID. */
+  public long loadInfo(FileSystem fs) throws IOException {
+    Path p = getInfoFilePath();
+    DataInputStream in = new DataInputStream(fs.open(p));
+    
+    try {
+      byte flag = in.readByte();
+      if(flag == INFO_SEQ_NUM) {
+        return in.readLong();
+        
+      } else {
+        throw new IOException("Cannot process log file: " + p);
+      }
+      
+    } finally {
+      in.close();
+    }
+  }
+  
+  /** Write the file-identifier to disk */
+  public void writeInfo(FileSystem fs, long infonum) throws IOException {
+    Path p = getInfoFilePath();
+    DataOutputStream out = new DataOutputStream(fs.create(p));
+    
+    try {
+      out.writeByte(INFO_SEQ_NUM);
+      out.writeLong(infonum);
+      
+    } finally {
+      out.close();
+    }
+  }
+
+  //////////////////////////////////////////////////////////////////////////////
+  // Writable
+  //////////////////////////////////////////////////////////////////////////////
+
+  public void write(DataOutput out) throws IOException {
+    out.writeUTF(dir.toString());
+    regionName.write(out);
+    colFamily.write(out);
+    out.writeLong(fileId);
+  }
+  
+  public void readFields(DataInput in) throws IOException {
+    this.dir = new Path(in.readUTF());
+    this.regionName.readFields(in);
+    this.colFamily.readFields(in);
+    this.fileId = in.readLong();
+  }
+
+  //////////////////////////////////////////////////////////////////////////////
+  // Comparable
+  //////////////////////////////////////////////////////////////////////////////
+
+  public int compareTo(Object o) {
+    HStoreFile other = (HStoreFile) o;
+    int result = this.dir.compareTo(other.dir);    
+    if(result == 0) {
+      this.regionName.compareTo(other.regionName);
+    }
+    if(result == 0) {
+      result = this.colFamily.compareTo(other.colFamily);
+    }    
+    if(result == 0) {
+      if(this.fileId < other.fileId) {
+        result = -1;
+        
+      } else if(this.fileId > other.fileId) {
+        result = 1;
+      }
+    }
+    return result;
+  }
+
+  
+  public boolean equals(Object o) {
+    return this.compareTo(o) == 0;
+  }
+}
+
+

Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java?view=auto&rev=525267
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java Tue Apr  3 13:34:28 2007
@@ -0,0 +1,158 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import org.apache.hadoop.io.*;
+
+import java.io.*;
+
+/*******************************************************************************
+ * A Key for a stored row
+ ******************************************************************************/
+public class HStoreKey implements WritableComparable {
+  public static final byte[] DELETE_BYTES = "HSTOREKEY::DELETEVAL".getBytes();
+  public static final byte[] COMPLETE_CACHEFLUSH = "HSTOREKEY::CACHEFLUSH".getBytes();
+
+  public static Text extractFamily(Text col) throws IOException {
+    String column = col.toString();
+    int colpos = column.indexOf(":");
+    if(colpos < 0) {
+      throw new IllegalArgumentException("Illegal column name has no family indicator: " + column);
+    }
+    return new Text(column.substring(0, colpos));
+  }
+
+  Text row;
+  Text column;
+  long timestamp;
+
+  public HStoreKey() {
+    this.row = new Text();
+    this.column = new Text();
+    this.timestamp = Long.MAX_VALUE;
+  }
+  
+  public HStoreKey(Text row) {
+    this.row = new Text(row);
+    this.column = new Text();
+    this.timestamp = Long.MAX_VALUE;
+  }
+  
+  public HStoreKey(Text row, long timestamp) {
+    this.row = new Text(row);
+    this.column = new Text();
+    this.timestamp = timestamp;
+  }
+  
+  public HStoreKey(Text row, Text column) {
+    this.row = new Text(row);
+    this.column = new Text(column);
+    this.timestamp = Long.MAX_VALUE;
+  }
+  
+  public HStoreKey(Text row, Text column, long timestamp) {
+    this.row = new Text(row);
+    this.column = new Text(column);
+    this.timestamp = timestamp;
+  }
+  
+  public void setRow(Text newrow) {
+    this.row.set(newrow);
+  }
+  
+  public void setColumn(Text newcol) {
+    this.column.set(newcol);
+  }
+  
+  public void setVersion(long timestamp) {
+    this.timestamp = timestamp;
+  }
+  
+  public Text getRow() {
+    return row;
+  }
+  
+  public Text getColumn() {
+    return column;
+  }
+  
+  public long getTimestamp() {
+    return timestamp;
+  }
+  
+  public boolean matchesRowCol(HStoreKey other) {
+    if(this.row.compareTo(other.row) == 0 &&
+        this.column.compareTo(other.column) == 0) {
+      return true;
+      
+    } else {
+      return false;
+    }
+  }
+  
+  public boolean matchesWithoutColumn(HStoreKey other) {
+    if((this.row.compareTo(other.row) == 0) &&
+        (this.timestamp >= other.getTimestamp())) {
+      return true;
+      
+    } else {
+      return false;
+    }
+  }
+  
+  public String toString() {
+    return row.toString() + "/" + column.toString() + "/" + timestamp;
+  }
+
+  //////////////////////////////////////////////////////////////////////////////
+  // Comparable
+  //////////////////////////////////////////////////////////////////////////////
+
+  public int compareTo(Object o) {
+    HStoreKey other = (HStoreKey) o;
+    int result = this.row.compareTo(other.row);
+    if(result == 0) {
+      result = this.column.compareTo(other.column);
+      
+      if(result == 0) {
+        if(this.timestamp < other.timestamp) {
+          result = 1;
+          
+        } else if(this.timestamp > other.timestamp) {
+          result = -1;
+        }
+      }
+    }
+    return result;
+  }
+
+  //////////////////////////////////////////////////////////////////////////////
+  // Writable
+  //////////////////////////////////////////////////////////////////////////////
+
+  public void write(DataOutput out) throws IOException {
+    row.write(out);
+    column.write(out);
+    out.writeLong(timestamp);
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    row.readFields(in);
+    column.readFields(in);
+    timestamp = in.readLong();
+  }
+}
+

Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTableDescriptor.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTableDescriptor.java?view=auto&rev=525267
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTableDescriptor.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTableDescriptor.java Tue Apr  3 13:34:28 2007
@@ -0,0 +1,123 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import org.apache.hadoop.io.*;
+
+import java.io.*;
+import java.util.*;
+
+/*******************************************************************************
+ * HTableDescriptor contains various facts about an HTable, like its columns, 
+ * column families, etc.
+ ******************************************************************************/
+public class HTableDescriptor implements WritableComparable {
+  Text name;
+  int maxVersions;
+  TreeSet<Text> families = new TreeSet<Text>();
+
+  public HTableDescriptor() {
+    this.name = new Text();
+    this.families.clear();
+  }
+
+  public HTableDescriptor(String name, int maxVersions) {
+    this.name = new Text(name);
+    this.maxVersions = maxVersions;
+  }
+
+  public Text getName() {
+    return name;
+  }
+
+  public int getMaxVersions() {
+    return maxVersions;
+  }
+
+  /** Add a column */
+  public void addFamily(Text family) {
+    families.add(family);
+  }
+
+  /** Do we contain a given column? */
+  public boolean hasFamily(Text family) {
+    if(families.contains(family)) {
+      return true;
+      
+    } else {
+      return false;
+    }
+  }
+
+  /** All the column families in this table. */
+  public TreeSet<Text> families() {
+    return families;
+  }
+
+  //////////////////////////////////////////////////////////////////////////////
+  // Writable
+  //////////////////////////////////////////////////////////////////////////////
+
+  public void write(DataOutput out) throws IOException {
+    name.write(out);
+    out.writeInt(maxVersions);
+    out.writeInt(families.size());
+    for(Iterator<Text> it = families.iterator(); it.hasNext(); ) {
+      it.next().write(out);
+    }
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    this.name.readFields(in);
+    this.maxVersions = in.readInt();
+    int numCols = in.readInt();
+    families.clear();
+    for(int i = 0; i < numCols; i++) {
+      Text t = new Text();
+      t.readFields(in);
+      families.add(t);
+    }
+  }
+
+  //////////////////////////////////////////////////////////////////////////////
+  // Comparable
+  //////////////////////////////////////////////////////////////////////////////
+
+  public int compareTo(Object o) {
+    HTableDescriptor htd = (HTableDescriptor) o;
+    int result = name.compareTo(htd.name);
+    if(result == 0) {
+      result = maxVersions - htd.maxVersions;
+    }
+    
+    if(result == 0) {
+      result = families.size() - htd.families.size();
+    }
+    
+    if(result == 0) {
+      Iterator<Text> it2 = htd.families.iterator();
+      for(Iterator<Text> it = families.iterator(); it.hasNext(); ) {
+        Text family1 = it.next();
+        Text family2 = it2.next();
+        result = family1.compareTo(family2);
+        if(result != 0) {
+          return result;
+        }
+      }
+    }
+    return result;
+  }
+}

Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/LabelledData.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/LabelledData.java?view=auto&rev=525267
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/LabelledData.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/LabelledData.java Tue Apr  3 13:34:28 2007
@@ -0,0 +1,60 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+import org.apache.hadoop.io.*;
+
+import java.io.*;
+
+/*******************************************************************************
+ * LabelledData is just a data pair.
+ * It includes a Text label and some associated data.
+ ******************************************************************************/
+public class LabelledData implements Writable {
+  Text label;
+  BytesWritable data;
+
+  public LabelledData() {
+    this.label = new Text();
+    this.data = new BytesWritable();
+  }
+
+  public LabelledData(Text label, byte[] data) {
+    this.label.set(label);
+    this.data = new BytesWritable(data);
+  }
+
+  public Text getLabel() {
+    return label;
+  }
+
+  public BytesWritable getDat() {
+    return data;
+  }
+
+  //////////////////////////////////////////////////////////////////////////////
+  // Writable
+  //////////////////////////////////////////////////////////////////////////////
+
+  public void write(DataOutput out) throws IOException {
+    label.write(out);
+    data.write(out);
+  }
+  
+  public void readFields(DataInput in) throws IOException {
+    label.readFields(in);
+    data.readFields(in);
+  }
+}

Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/LeaseListener.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/LeaseListener.java?view=auto&rev=525267
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/LeaseListener.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/LeaseListener.java Tue Apr  3 13:34:28 2007
@@ -0,0 +1,41 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+
+/*******************************************************************************
+ * LeaseListener is a small class meant to be overridden by users of the Leases 
+ * class.
+ *
+ * It receives events from the Leases class about the status of its accompanying
+ * lease.  Users of the Leases class can use a LeaseListener subclass to, for 
+ * example, clean up resources after a lease has expired.
+ ******************************************************************************/
+public abstract class LeaseListener {
+  public LeaseListener() {
+  }
+
+  public void leaseRenewed() {
+  }
+
+  /** When the user cancels a lease, this method is called. */
+  public void leaseCancelled() {
+  }
+
+  /** When a lease expires, this method is called. */
+  public void leaseExpired() {
+  }
+}

Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java?view=auto&rev=525267
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java Tue Apr  3 13:34:28 2007
@@ -0,0 +1,220 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import org.apache.hadoop.io.*;
+
+import java.io.*;
+import java.util.*;
+
+/*******************************************************************************
+ * Leases
+ *
+ * There are several server classes in HBase that need to track external clients
+ * that occasionally send heartbeats.
+ * 
+ * These external clients hold resources in the server class.  Those resources 
+ * need to be released if the external client fails to send a heartbeat after 
+ * some interval of time passes.
+ *
+ * The Leases class is a general reusable class for this kind of pattern.
+ *
+ * An instance of the Leases class will create a thread to do its dirty work.  
+ * You should close() the instance if you want to clean up the thread properly.
+ ******************************************************************************/
+public class Leases {
+  long leasePeriod;
+  long leaseCheckFrequency;
+  LeaseMonitor leaseMonitor;
+  Thread leaseMonitorThread;
+  TreeMap<Text, Lease> leases = new TreeMap<Text, Lease>();
+  TreeSet<Lease> sortedLeases = new TreeSet<Lease>();
+  boolean running = true;
+
+  /** Indicate the length of the lease, in milliseconds */
+  public Leases(long leasePeriod, long leaseCheckFrequency) {
+    this.leasePeriod = leasePeriod;
+
+    this.leaseMonitor = new LeaseMonitor();
+    this.leaseMonitorThread = new Thread(leaseMonitor);
+    leaseMonitorThread.start();
+  }
+
+  /**
+   * Shut down this Leases outfit.  All pending leases will be destroyed, 
+   * without any cancellation calls.
+   */
+  public void close() {
+    this.running = false;
+    try {
+      this.leaseMonitorThread.join();
+    } catch (InterruptedException iex) {
+    }
+    synchronized(leases) {
+      synchronized(sortedLeases) {
+        leases.clear();
+        sortedLeases.clear();
+      }
+    }
+  }
+
+  /** A client obtains a lease... */
+  public void createLease(Text holderId, Text resourceId, LeaseListener listener) throws IOException {
+    synchronized(leases) {
+      synchronized(sortedLeases) {
+        Lease lease = new Lease(holderId, resourceId, listener);
+        Text leaseId = lease.getLeaseId();
+        if(leases.get(leaseId) != null) {
+          throw new IOException("Impossible state for createLease(): Lease for holderId " + holderId + " and resourceId " + resourceId + " is still held.");
+        }
+        leases.put(leaseId, lease);
+        sortedLeases.add(lease);
+      }
+    }
+  }
+  
+  /** A client renews a lease... */
+  public void renewLease(Text holderId, Text resourceId) throws IOException {
+    synchronized(leases) {
+      synchronized(sortedLeases) {
+        Text leaseId = createLeaseId(holderId, resourceId);
+        Lease lease = leases.get(leaseId);
+        if(lease == null) {
+          
+          // It's possible that someone tries to renew the lease, but 
+          // it just expired a moment ago.  So fail.
+          
+          throw new IOException("Cannot renew lease is not held (holderId=" + holderId + ", resourceId=" + resourceId + ")");
+        }
+        
+        sortedLeases.remove(lease);
+        lease.renew();
+        sortedLeases.add(lease);
+      }
+    }
+  }
+
+  /** A client explicitly cancels a lease.  The lease-cleanup method is not called. */
+  public void cancelLease(Text holderId, Text resourceId) throws IOException {
+    synchronized(leases) {
+      synchronized(sortedLeases) {
+        Text leaseId = createLeaseId(holderId, resourceId);
+        Lease lease = leases.get(leaseId);
+        if(lease == null) {
+          
+          // It's possible that someone tries to renew the lease, but 
+          // it just expired a moment ago.  So fail.
+          
+          throw new IOException("Cannot cancel lease that is not held (holderId=" + holderId + ", resourceId=" + resourceId + ")");
+        }
+        
+        sortedLeases.remove(lease);
+        leases.remove(leaseId);
+
+        lease.cancelled();
+      }
+    }        
+  }
+
+  /** LeaseMonitor is a thread that expires Leases that go on too long. */
+  class LeaseMonitor implements Runnable {
+    public void run() {
+      while(running) {
+        synchronized(leases) {
+          synchronized(sortedLeases) {
+            Lease top;
+            while((sortedLeases.size() > 0)
+                && ((top = sortedLeases.first()) != null)) {
+              
+              if(top.shouldExpire()) {
+                leases.remove(top.getLeaseId());
+                sortedLeases.remove(top);
+
+                top.expired();
+              
+              } else {
+                break;
+              }
+            }
+          }
+        }
+        try {
+          Thread.sleep(leaseCheckFrequency);
+        } catch (InterruptedException ie) {
+        }
+      }
+    }
+  }
+
+  /** Create a lease id out of the holder and resource ids. */
+  Text createLeaseId(Text holderId, Text resourceId) {
+    return new Text("_" + holderId + "/" + resourceId + "_");
+  }
+
+  /** This class tracks a single Lease. */
+  class Lease implements Comparable {
+    Text holderId;
+    Text resourceId;
+    LeaseListener listener;
+    long lastUpdate;
+
+    public Lease(Text holderId, Text resourceId, LeaseListener listener) {
+      this.holderId = holderId;
+      this.resourceId = resourceId;
+      this.listener = listener;
+      renew();
+    }
+    
+    public Text getLeaseId() {
+      return createLeaseId(holderId, resourceId);
+    }
+    
+    public boolean shouldExpire() {
+      return (System.currentTimeMillis() - lastUpdate > leasePeriod);
+    }
+    
+    public void renew() {
+      this.lastUpdate = System.currentTimeMillis();
+      listener.leaseRenewed();
+    }
+    
+    public void cancelled() {
+      listener.leaseCancelled();
+    }
+    
+    public void expired() {
+      listener.leaseExpired();
+    }
+    
+    //////////////////////////////////////////////////////////////////////////////
+    // Comparable
+    //////////////////////////////////////////////////////////////////////////////
+
+    public int compareTo(Object o) {
+      Lease other = (Lease) o;
+      if(this.lastUpdate < other.lastUpdate) {
+        return -1;
+        
+      } else if(this.lastUpdate > other.lastUpdate) {
+        return 1;
+        
+      } else {
+        return this.getLeaseId().compareTo(other.getLeaseId());
+      }
+    }
+  }
+}
+

Added: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/Environment.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/Environment.java?view=auto&rev=525267
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/Environment.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/Environment.java Tue Apr  3 13:34:28 2007
@@ -0,0 +1,58 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import org.apache.log4j.Level;
+/**
+ * Retrieve environment variables that control debugging and logging environment
+ */
+public class Environment {
+  public static boolean debugging = false;
+  public static Level logLevel = Level.INFO;
+  
+  private Environment() {};                          // Not instantiable
+  
+  public static void getenv() {
+    String value = null;
+    
+    value = System.getenv("DEBUGGING");
+    if(value != null && value.equalsIgnoreCase("TRUE")) {
+      debugging = true;
+    }
+    
+    value = System.getenv("LOGGING_LEVEL");
+    if(value != null && value.length() != 0) {
+      if(value.equalsIgnoreCase("ALL")) {
+        logLevel = Level.ALL;
+      } else if(value.equalsIgnoreCase("DEBUG")) {
+        logLevel = Level.DEBUG;
+      } else if(value.equalsIgnoreCase("ERROR")) {
+        logLevel = Level.ERROR;
+      } else if(value.equalsIgnoreCase("FATAL")) {
+        logLevel = Level.FATAL;
+      } else if(value.equalsIgnoreCase("INFO")) {
+        logLevel = Level.INFO;
+      } else if(value.equalsIgnoreCase("OFF")) {
+        logLevel = Level.OFF;
+      } else if(value.equalsIgnoreCase("TRACE")) {
+        logLevel = Level.TRACE;
+      } else if(value.equalsIgnoreCase("WARN")) {
+        logLevel = Level.WARN;
+      }
+    }
+  }
+  
+}