You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2008/07/17 09:17:28 UTC

svn commit: r677517 [5/6] - in /hadoop/hbase/trunk: ./ src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/client/ src/java/org/apache/hadoop/hbase/ipc/ src/java/org/apache/hadoop/hbase/master/ src/java/org/apache/hadoop/hbase/regionserv...

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/HStoreFile.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/HStoreFile.java?rev=677517&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/HStoreFile.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/HStoreFile.java Thu Jul 17 00:17:26 2008
@@ -0,0 +1,1032 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util.migration.v5;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.BlockFSInputStream;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.onelab.filter.BloomFilter;
+import org.onelab.filter.Key;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HStoreKey;
+
+/**
+ * A HStore data file.  HStores usually have one or more of these files.  They
+ * are produced by flushing the memcache to disk.
+ *
+ * <p>Each HStore maintains a bunch of different data files. The filename is a
+ * mix of the parent dir, the region name, the column name, and a file
+ * identifier. The name may also be a reference to a store file located
+ * elsewhere. This class handles all that path-building stuff for you.
+ * 
+ * <p>An HStoreFile usually tracks 4 things: its parent dir, the region
+ * identifier, the column family, and the file identifier.  If you know those
+ * four things, you know how to obtain the right HStoreFile.  HStoreFiles may
+ * also refernce store files in another region serving either from
+ * the top-half of the remote file or from the bottom-half.  Such references
+ * are made fast splitting regions.
+ * 
+ * <p>Plain HStoreFiles are named for a randomly generated id as in:
+ * <code>1278437856009925445</code>  A file by this name is made in both the
+ * <code>mapfiles</code> and <code>info</code> subdirectories of a
+ * HStore columnfamily directoy: E.g. If the column family is 'anchor:', then
+ * under the region directory there is a subdirectory named 'anchor' within
+ * which is a 'mapfiles' and 'info' subdirectory.  In each will be found a
+ * file named something like <code>1278437856009925445</code>, one to hold the
+ * data in 'mapfiles' and one under 'info' that holds the sequence id for this
+ * store file.
+ * 
+ * <p>References to store files located over in some other region look like
+ * this:
+ * <code>1278437856009925445.hbaserepository,qAReLZD-OyQORZWq_vqR1k==,959247014679548184</code>:
+ * i.e. an id followed by the name of the referenced region.  The data
+ * ('mapfiles') of HStoreFile references are empty. The accompanying
+ * <code>info</code> file contains the
+ * midkey, the id of the remote store we're referencing and whether we're
+ * to serve the top or bottom region of the remote store file.  Note, a region
+ * is not splitable if it has instances of store file references (References
+ * are cleaned up by compactions).
+ * 
+ * <p>When merging or splitting HRegions, we might want to modify one of the 
+ * params for an HStoreFile (effectively moving it elsewhere).
+ */
+public class HStoreFile implements HConstants {
+  static final Log LOG = LogFactory.getLog(HStoreFile.class.getName());
+  static final byte INFO_SEQ_NUM = 0;
+  static final String HSTORE_DATFILE_DIR = "mapfiles";
+  static final String HSTORE_INFO_DIR = "info";
+  static final String HSTORE_FILTER_DIR = "filter";
+  
+  /** 
+   * For split HStoreFiles, specifies if the file covers the lower half or
+   * the upper half of the key range
+   */
+  public static enum Range {
+    /** HStoreFile contains upper half of key range */
+    top,
+    /** HStoreFile contains lower half of key range */
+    bottom
+  }
+  
+  private final static Random rand = new Random();
+
+  private final Path basedir;
+  private final int encodedRegionName;
+  private final byte [] colFamily;
+  private final long fileId;
+  private final HBaseConfiguration conf;
+  private final FileSystem fs;
+  private final Reference reference;
+
+  /**
+   * Constructor that fully initializes the object
+   * @param conf Configuration object
+   * @param basedir qualified path that is parent of region directory
+   * @param encodedRegionName file name friendly name of the region
+   * @param colFamily name of the column family
+   * @param fileId file identifier
+   * @param ref Reference to another HStoreFile.
+   * @throws IOException
+   */
+  HStoreFile(HBaseConfiguration conf, FileSystem fs, Path basedir,
+      int encodedRegionName, byte [] colFamily, long fileId,
+      final Reference ref) throws IOException {
+    this.conf = conf;
+    this.fs = fs;
+    this.basedir = basedir;
+    this.encodedRegionName = encodedRegionName;
+    this.colFamily = colFamily;
+    
+    long id = fileId;
+    if (id == -1) {
+      Path mapdir = HStoreFile.getMapDir(basedir, encodedRegionName, colFamily);
+      Path testpath = null;
+      do {
+        id = Math.abs(rand.nextLong());
+        testpath = new Path(mapdir, createHStoreFilename(id, -1));
+      } while(fs.exists(testpath));
+    }
+    this.fileId = id;
+    
+    // If a reference, construction does not write the pointer files.  Thats
+    // done by invocations of writeReferenceFiles(hsf, fs).  Happens at fast
+    // split time.
+    this.reference = ref;
+  }
+
+  /** @return the region name */
+  boolean isReference() {
+    return reference != null;
+  }
+  
+  Reference getReference() {
+    return reference;
+  }
+
+  int getEncodedRegionName() {
+    return this.encodedRegionName;
+  }
+
+  /** @return the column family */
+  byte [] getColFamily() {
+    return colFamily;
+  }
+
+  /** @return the file identifier */
+  long getFileId() {
+    return fileId;
+  }
+
+  // Build full filenames from those components
+  
+  /** @return path for MapFile */
+  Path getMapFilePath() {
+    if (isReference()) {
+      return getMapFilePath(encodedRegionName, fileId,
+          reference.getEncodedRegionName());
+    }
+    return getMapFilePath(this.encodedRegionName, fileId);
+  }
+
+  private Path getMapFilePath(final Reference r) {
+    if (r == null) {
+      return getMapFilePath();
+    }
+    return getMapFilePath(r.getEncodedRegionName(), r.getFileId());
+  }
+
+  private Path getMapFilePath(final int encodedName, final long fid) {
+    return getMapFilePath(encodedName, fid, HRegionInfo.NO_HASH);
+  }
+  
+  private Path getMapFilePath(final int encodedName, final long fid,
+      final int ern) {
+    return new Path(HStoreFile.getMapDir(basedir, encodedName, colFamily), 
+      createHStoreFilename(fid, ern));
+  }
+
+  /** @return path for info file */
+  Path getInfoFilePath() {
+    if (isReference()) {
+      return getInfoFilePath(encodedRegionName, fileId,
+          reference.getEncodedRegionName());
+ 
+    }
+    return getInfoFilePath(encodedRegionName, fileId);
+  }
+  
+  private Path getInfoFilePath(final int encodedName, final long fid) {
+    return getInfoFilePath(encodedName, fid, HRegionInfo.NO_HASH);
+  }
+  
+  private Path getInfoFilePath(final int encodedName, final long fid,
+      final int ern) {
+    return new Path(HStoreFile.getInfoDir(basedir, encodedName, colFamily), 
+      createHStoreFilename(fid, ern));
+  }
+
+  // File handling
+
+  /*
+   * Split by making two new store files that reference top and bottom regions
+   * of original store file.
+   * @param midKey
+   * @param dstA
+   * @param dstB
+   * @param fs
+   * @param c
+   * @throws IOException
+   *
+   * @param midKey the key which will be the starting key of the second region
+   * @param dstA the file which will contain keys from the start of the source
+   * @param dstB the file which will contain keys from midKey to end of source
+   * @param fs file system
+   * @param c configuration
+   * @throws IOException
+   */
+  void splitStoreFile(final HStoreFile dstA, final HStoreFile dstB,
+      final FileSystem fs)
+  throws IOException {
+    dstA.writeReferenceFiles(fs);
+    dstB.writeReferenceFiles(fs);
+  }
+  
+  void writeReferenceFiles(final FileSystem fs)
+  throws IOException {
+    createOrFail(fs, getMapFilePath());
+    writeSplitInfo(fs);
+  }
+  
+  /*
+   * If reference, create and write the remote store file id, the midkey and
+   * whether we're going against the top file region of the referent out to
+   * the info file. 
+   * @param p Path to info file.
+   * @param hsf
+   * @param fs
+   * @throws IOException
+   */
+  private void writeSplitInfo(final FileSystem fs) throws IOException {
+    Path p = getInfoFilePath();
+    if (fs.exists(p)) {
+      throw new IOException("File already exists " + p.toString());
+    }
+    FSDataOutputStream out = fs.create(p);
+    try {
+      reference.write(out);
+    } finally {
+      out.close();
+   }
+  }
+
+  /**
+   * @see #writeSplitInfo(FileSystem fs)
+   */
+  static HStoreFile.Reference readSplitInfo(final Path p, final FileSystem fs)
+  throws IOException {
+    FSDataInputStream in = fs.open(p);
+    try {
+      HStoreFile.Reference r = new HStoreFile.Reference();
+      r.readFields(in);
+      return r;
+    } finally {
+      in.close();
+    }
+  }
+
+  private void createOrFail(final FileSystem fs, final Path p)
+  throws IOException {
+    if (fs.exists(p)) {
+      throw new IOException("File already exists " + p.toString());
+    }
+    if (!fs.createNewFile(p)) {
+      throw new IOException("Failed create of " + p);
+    }
+  }
+
+  /** 
+   * Reads in an info file
+   *
+   * @param fs file system
+   * @return The sequence id contained in the info file
+   * @throws IOException
+   */
+  long loadInfo(FileSystem fs) throws IOException {
+    Path p = null;
+    if (isReference()) {
+      p = getInfoFilePath(reference.getEncodedRegionName(), reference.getFileId());
+    } else {
+      p = getInfoFilePath();
+    }
+    DataInputStream in = new DataInputStream(fs.open(p));
+    try {
+      byte flag = in.readByte();
+      if(flag == INFO_SEQ_NUM) {
+        return in.readLong();
+      }
+      throw new IOException("Cannot process log file: " + p);
+    } finally {
+      in.close();
+    }
+  }
+  
+  /**
+   * Writes the file-identifier to disk
+   * 
+   * @param fs file system
+   * @param infonum file id
+   * @throws IOException
+   */
+  void writeInfo(FileSystem fs, long infonum) throws IOException {
+    Path p = getInfoFilePath();
+    FSDataOutputStream out = fs.create(p);
+    try {
+      out.writeByte(INFO_SEQ_NUM);
+      out.writeLong(infonum);
+    } finally {
+      out.close();
+    }
+  }
+  
+  /**
+   * Delete store map files.
+   * @throws IOException 
+   */
+  public void delete() throws IOException {
+    fs.delete(getMapFilePath(), true);
+    fs.delete(getInfoFilePath(), true);
+  }
+  
+  /**
+   * Renames the mapfiles and info directories under the passed
+   * <code>hsf</code> directory.
+   * @param fs
+   * @param hsf
+   * @return True if succeeded.
+   * @throws IOException
+   */
+  public boolean rename(final FileSystem fs, final HStoreFile hsf)
+  throws IOException {
+    Path src = getMapFilePath();
+    if (!fs.exists(src)) {
+      throw new FileNotFoundException(src.toString());
+    }
+    boolean success = fs.rename(src, hsf.getMapFilePath());
+    if (!success) {
+      LOG.warn("Failed rename of " + src + " to " + hsf.getMapFilePath());
+    } else {
+      src = getInfoFilePath();
+      if (!fs.exists(src)) {
+        throw new FileNotFoundException(src.toString());
+      }
+      success = fs.rename(src, hsf.getInfoFilePath());
+      if (!success) {
+        LOG.warn("Failed rename of " + src + " to " + hsf.getInfoFilePath());
+      }
+    }
+    return success;
+  }
+  
+  /**
+   * Get reader for the store file map file.
+   * Client is responsible for closing file when done.
+   * @param fs
+   * @param bloomFilter If true, a bloom filter exists
+   * @param blockCacheEnabled If true, MapFile blocks should be cached.
+   * @return BloomFilterMapFile.Reader
+   * @throws IOException
+   */
+  public synchronized BloomFilterMapFile.Reader getReader(final FileSystem fs,
+      final boolean bloomFilter, final boolean blockCacheEnabled)
+  throws IOException {
+    if (isReference()) {
+      return new HStoreFile.HalfMapFileReader(fs,
+          getMapFilePath(reference).toString(), conf, 
+          reference.getFileRegion(), reference.getMidkey(), bloomFilter,
+          blockCacheEnabled);
+    }
+    return new BloomFilterMapFile.Reader(fs, getMapFilePath().toString(),
+        conf, bloomFilter, blockCacheEnabled);
+  }
+
+  /**
+   * Get a store file writer.
+   * Client is responsible for closing file when done.
+   * @param fs
+   * @param compression Pass <code>SequenceFile.CompressionType.NONE</code>
+   * for none.
+   * @param bloomFilter If true, create a bloom filter
+   * @param nrows number of rows expected. Required if bloomFilter is true.
+   * @return MapFile.Writer
+   * @throws IOException
+   */
+  public MapFile.Writer getWriter(final FileSystem fs,
+      final SequenceFile.CompressionType compression,
+      final boolean bloomFilter, int nrows)
+  throws IOException {
+    if (isReference()) {
+      throw new IOException("Illegal Access: Cannot get a writer on a" +
+        "HStoreFile reference");
+    }
+    return new BloomFilterMapFile.Writer(conf, fs,
+      getMapFilePath().toString(), compression, bloomFilter, nrows);
+  }
+
+  /**
+   * @return Length of the store map file.  If a reference, size is
+   * approximation.
+   * @throws IOException
+   */
+  public long length() throws IOException {
+    Path p = new Path(getMapFilePath(reference), MapFile.DATA_FILE_NAME);
+    long l = p.getFileSystem(conf).getFileStatus(p).getLen();
+    return (isReference())? l / 2: l;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public String toString() {
+    return encodedRegionName + "/" + Bytes.toString(colFamily) + "/" + fileId +
+      (isReference()? "-" + reference.toString(): "");
+  }
+  
+  static boolean isTopFileRegion(final Range r) {
+    return r.equals(Range.top);
+  }
+
+  private static String createHStoreFilename(final long fid,
+      final int encodedRegionName) {
+    return Long.toString(fid) + 
+      ((encodedRegionName != HRegionInfo.NO_HASH)?
+        "." + encodedRegionName : "");
+  }
+
+  /**
+   * @param dir Base directory
+   * @param encodedRegionName Encoding of region name.
+   * @param f Column family.
+   * @return path for map file directory
+   */
+  public static Path getMapDir(Path dir, int encodedRegionName,
+      final byte [] f) {
+    return getFamilySubDir(dir, encodedRegionName, f, HSTORE_DATFILE_DIR);
+  }
+
+  /**
+   * @param dir Base directory
+   * @param encodedRegionName Encoding of region name.
+   * @param f Column family.
+   * @return the info directory path
+   */
+  public static Path getInfoDir(Path dir, int encodedRegionName, byte [] f) {
+    return getFamilySubDir(dir, encodedRegionName, f, HSTORE_INFO_DIR);
+  }
+
+  /**
+   * @param dir Base directory
+   * @param encodedRegionName Encoding of region name.
+   * @param f Column family.
+   * @return the bloom filter directory path
+   */
+  @Deprecated
+  public static Path getFilterDir(Path dir, int encodedRegionName,
+      final byte [] f) {
+    return getFamilySubDir(dir, encodedRegionName, f, HSTORE_FILTER_DIR);
+  }
+  
+  /*
+   * @param base Base directory
+   * @param encodedRegionName Encoding of region name.
+   * @param f Column family.
+   * @param subdir Subdirectory to create under column family/store directory.
+   * @return
+   */
+  private static Path getFamilySubDir(final Path base,
+      final int encodedRegionName, final byte [] f, final String subdir) {
+    return new Path(base, new Path(Integer.toString(encodedRegionName),
+        new Path(Bytes.toString(f), subdir)));
+  }
+
+  /*
+   * Data structure to hold reference to a store file over in another region.
+   */
+  static class Reference implements Writable {
+    private int encodedRegionName;
+    private long fileid;
+    private Range region;
+    private HStoreKey midkey;
+    
+    Reference(final int ern, final long fid, final HStoreKey m,
+        final Range fr) {
+      this.encodedRegionName = ern;
+      this.fileid = fid;
+      this.region = fr;
+      this.midkey = m;
+    }
+    
+    Reference() {
+      this(-1, -1, null, Range.bottom);
+    }
+
+    long getFileId() {
+      return fileid;
+    }
+
+    Range getFileRegion() {
+      return region;
+    }
+    
+    HStoreKey getMidkey() {
+      return midkey;
+    }
+    
+    int getEncodedRegionName() {
+      return this.encodedRegionName;
+    }
+   
+    /** {@inheritDoc} */
+    @Override
+    public String toString() {
+      return encodedRegionName + "/" + fileid + "/" + region;
+    }
+
+    // Make it serializable.
+
+    /** {@inheritDoc} */
+    public void write(DataOutput out) throws IOException {
+      // Write out the encoded region name as a String.  Doing it as a String
+      // keeps a Reference's serialziation backword compatible with
+      // pre-HBASE-82 serializations.  ALternative is rewriting all
+      // info files in hbase (Serialized References are written into the
+      // 'info' file that accompanies HBase Store files).
+      out.writeUTF(Integer.toString(encodedRegionName));
+      out.writeLong(fileid);
+      // Write true if we're doing top of the file.
+      out.writeBoolean(isTopFileRegion(region));
+      midkey.write(out);
+    }
+
+    /** {@inheritDoc} */
+    public void readFields(DataInput in) throws IOException {
+      this.encodedRegionName = Integer.parseInt(in.readUTF());
+      fileid = in.readLong();
+      boolean tmp = in.readBoolean();
+      // If true, set region to top.
+      region = tmp? Range.top: Range.bottom;
+      midkey = new HStoreKey();
+      midkey.readFields(in);
+    }
+  }
+
+  /**
+   * Hbase customizations of MapFile.
+   */
+  static class HbaseMapFile extends MapFile {
+    static final Class<? extends Writable>  KEY_CLASS = HStoreKey.class;
+    static final Class<? extends Writable>  VALUE_CLASS =
+      ImmutableBytesWritable.class;
+
+    /**
+     * Custom bloom filter key maker.
+     * @param key
+     * @return Key made of bytes of row only.
+     */
+    protected static Key getBloomFilterKey(WritableComparable key) {
+      return new Key(((HStoreKey) key).getRow());
+    }
+
+    /**
+     * A reader capable of reading and caching blocks of the data file.
+     */
+    static class HbaseReader extends MapFile.Reader {
+
+      private final boolean blockCacheEnabled;
+
+      /**
+       * @param fs
+       * @param dirName
+       * @param conf
+       * @throws IOException
+       */
+      public HbaseReader(FileSystem fs, String dirName, Configuration conf)
+      throws IOException {
+        this(fs, dirName, conf, false);
+      }
+      
+      /**
+       * @param fs
+       * @param dirName
+       * @param conf
+       * @param blockCacheEnabled
+       * @throws IOException
+       */
+      public HbaseReader(FileSystem fs, String dirName, Configuration conf,
+          boolean blockCacheEnabled)
+      throws IOException {
+        super(fs, dirName, null, conf, false); // defer opening streams
+        this.blockCacheEnabled = blockCacheEnabled;
+        open(fs, dirName, null, conf);
+        
+        // Force reading of the mapfile index by calling midKey.
+        // Reading the index will bring the index into memory over
+        // here on the client and then close the index file freeing
+        // up socket connection and resources in the datanode. 
+        // Usually, the first access on a MapFile.Reader will load the
+        // index force the issue in HStoreFile MapFiles because an
+        // access may not happen for some time; meantime we're
+        // using up datanode resources.  See HADOOP-2341.
+        midKey();
+      }
+
+      @Override
+      protected org.apache.hadoop.io.SequenceFile.Reader createDataFileReader(
+          FileSystem fs, Path dataFile, Configuration conf)
+      throws IOException {
+        if (!blockCacheEnabled) {
+          return super.createDataFileReader(fs, dataFile, conf);
+        }
+        LOG.info("Block Cache enabled");
+        final int blockSize = conf.getInt("hbase.hstore.blockCache.blockSize",
+            64 * 1024);
+        return new SequenceFile.Reader(fs, dataFile,  conf) {
+          @Override
+          protected FSDataInputStream openFile(FileSystem fs, Path file,
+              int bufferSize, long length) throws IOException {
+            
+            return new FSDataInputStream(new BlockFSInputStream(
+                    super.openFile(fs, file, bufferSize, length), length,
+                    blockSize));
+          }
+        };
+      }
+    }
+    
+    static class HbaseWriter extends MapFile.Writer {
+      /**
+       * @param conf
+       * @param fs
+       * @param dirName
+       * @param compression
+       * @throws IOException
+       */
+      public HbaseWriter(Configuration conf, FileSystem fs, String dirName,
+          SequenceFile.CompressionType compression)
+      throws IOException {
+        super(conf, fs, dirName, KEY_CLASS, VALUE_CLASS, compression);
+        // Default for mapfiles is 128.  Makes random reads faster if we
+        // have more keys indexed and we're not 'next'-ing around in the
+        // mapfile.
+        setIndexInterval(conf.getInt("hbase.io.index.interval", 128));
+      }
+    }
+  }
+  
+  /**
+   * On write, all keys are added to a bloom filter.  On read, all keys are
+   * tested first against bloom filter. Keys are HStoreKey.  If passed bloom
+   * filter is null, just passes invocation to parent.
+   */
+  static class BloomFilterMapFile extends HbaseMapFile {
+    protected static final String BLOOMFILTER_FILE_NAME = "filter";
+
+    static class Reader extends HbaseReader {
+      private final BloomFilter bloomFilter;
+
+      /**
+       * @param fs
+       * @param dirName
+       * @param conf
+       * @param filter
+       * @param blockCacheEnabled
+       * @throws IOException
+       */
+      public Reader(FileSystem fs, String dirName, Configuration conf,
+          final boolean filter, final boolean blockCacheEnabled)
+      throws IOException {
+        super(fs, dirName, conf, blockCacheEnabled);
+        if (filter) {
+          this.bloomFilter = loadBloomFilter(fs, dirName);
+        } else {
+          this.bloomFilter = null;
+        }
+      }
+
+      private BloomFilter loadBloomFilter(FileSystem fs, String dirName)
+      throws IOException {
+        Path filterFile = new Path(dirName, BLOOMFILTER_FILE_NAME);
+        if(!fs.exists(filterFile)) {
+          throw new FileNotFoundException("Could not find bloom filter: " +
+              filterFile);
+        }
+        BloomFilter filter = new BloomFilter();
+        FSDataInputStream in = fs.open(filterFile);
+        try {
+          bloomFilter.readFields(in);
+        } finally {
+          fs.close();
+        }
+        return filter;
+      }
+      
+      /** {@inheritDoc} */
+      @Override
+      public Writable get(WritableComparable key, Writable val)
+      throws IOException {
+        if (bloomFilter == null) {
+          return super.get(key, val);
+        }
+        if(bloomFilter.membershipTest(getBloomFilterKey(key))) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("bloom filter reported that key exists");
+          }
+          return super.get(key, val);
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("bloom filter reported that key does not exist");
+        }
+        return null;
+      }
+
+      /** {@inheritDoc} */
+      @Override
+      public WritableComparable getClosest(WritableComparable key,
+          Writable val) throws IOException {
+        if (bloomFilter == null) {
+          return super.getClosest(key, val);
+        }
+        // Note - the key being passed to us is always a HStoreKey
+        if(bloomFilter.membershipTest(getBloomFilterKey(key))) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("bloom filter reported that key exists");
+          }
+          return super.getClosest(key, val);
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("bloom filter reported that key does not exist");
+        }
+        return null;
+      }
+      
+      /* @return size of the bloom filter */
+      int getBloomFilterSize() {
+        return bloomFilter == null ? 0 : bloomFilter.getVectorSize();
+      }
+    }
+    
+    static class Writer extends HbaseWriter {
+      private static final double DEFAULT_NUMBER_OF_HASH_FUNCTIONS = 4.0;
+      private final BloomFilter bloomFilter;
+      private final String dirName;
+      private final FileSystem fs;
+      
+      /**
+       * @param conf
+       * @param fs
+       * @param dirName
+       * @param compression
+       * @param filter
+       * @param nrows
+       * @throws IOException
+       */
+      @SuppressWarnings("unchecked")
+      public Writer(Configuration conf, FileSystem fs, String dirName,
+        SequenceFile.CompressionType compression, final boolean filter,
+        int nrows)
+      throws IOException {
+        super(conf, fs, dirName, compression);
+        this.dirName = dirName;
+        this.fs = fs;
+        if (filter) {
+          /* 
+           * There is no way to automatically determine the vector size and the
+           * number of hash functions to use. In particular, bloom filters are
+           * very sensitive to the number of elements inserted into them. For
+           * HBase, the number of entries depends on the size of the data stored
+           * in the column. Currently the default region size is 256MB, so the
+           * number of entries is approximately 
+           * 256MB / (average value size for column).
+           * 
+           * If m denotes the number of bits in the Bloom filter (vectorSize),
+           * n denotes the number of elements inserted into the Bloom filter and
+           * k represents the number of hash functions used (nbHash), then
+           * according to Broder and Mitzenmacher,
+           * 
+           * ( http://www.eecs.harvard.edu/~michaelm/NEWWORK/postscripts/BloomFilterSurvey.pdf )
+           * 
+           * the probability of false positives is minimized when k is
+           * approximately m/n ln(2).
+           */
+          this.bloomFilter = new BloomFilter(
+              (int) DEFAULT_NUMBER_OF_HASH_FUNCTIONS,
+              (int) Math.ceil(
+                  (DEFAULT_NUMBER_OF_HASH_FUNCTIONS * (1.0 * nrows)) /
+                  Math.log(2.0))
+          );
+        } else {
+          this.bloomFilter = null;
+        }
+      }
+
+      /** {@inheritDoc} */
+      @Override
+      public void append(WritableComparable key, Writable val)
+      throws IOException {
+        if (bloomFilter != null) {
+          bloomFilter.add(getBloomFilterKey(key));
+        }
+        super.append(key, val);
+      }
+
+      /** {@inheritDoc} */
+      @Override
+      public synchronized void close() throws IOException {
+        super.close();
+        if (this.bloomFilter != null) {
+          flushBloomFilter();
+        }
+      }
+      
+      /**
+       * Flushes bloom filter to disk
+       * 
+       * @throws IOException
+       */
+      private void flushBloomFilter() throws IOException {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("flushing bloom filter for " + this.dirName);
+        }
+        FSDataOutputStream out =
+          fs.create(new Path(dirName, BLOOMFILTER_FILE_NAME));
+        try {
+          bloomFilter.write(out);
+        } finally {
+          out.close();
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("flushed bloom filter for " + this.dirName);
+        }
+      }
+    }
+  }
+  
+  /**
+   * A facade for a {@link MapFile.Reader} that serves up either the top or
+   * bottom half of a MapFile (where 'bottom' is the first half of the file
+   * containing the keys that sort lowest and 'top' is the second half of the
+   * file with keys that sort greater than those of the bottom half).
+   * Subclasses BloomFilterMapFile.Reader in case 
+   * 
+   * <p>This file is not splitable.  Calls to {@link #midKey()} return null.
+   */
+  static class HalfMapFileReader extends BloomFilterMapFile.Reader {
+    private final boolean top;
+    private final WritableComparable midkey;
+    private boolean firstNextCall = true;
+    
+    HalfMapFileReader(final FileSystem fs, final String dirName, 
+        final Configuration conf, final Range r,
+        final WritableComparable midKey)
+    throws IOException {
+      this(fs, dirName, conf, r, midKey, false, false);
+    }
+    
+    HalfMapFileReader(final FileSystem fs, final String dirName, 
+        final Configuration conf, final Range r,
+        final WritableComparable midKey, final boolean filter,
+        final boolean blockCacheEnabled)
+    throws IOException {
+      super(fs, dirName, conf, filter, blockCacheEnabled);
+      top = isTopFileRegion(r);
+      midkey = midKey;
+    }
+    
+    @SuppressWarnings("unchecked")
+    private void checkKey(final WritableComparable key)
+    throws IOException {
+      if (top) {
+        if (key.compareTo(midkey) < 0) {
+          throw new IOException("Illegal Access: Key is less than midKey of " +
+          "backing mapfile");
+        }
+      } else if (key.compareTo(midkey) >= 0) {
+        throw new IOException("Illegal Access: Key is greater than or equal " +
+        "to midKey of backing mapfile");
+      }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public synchronized void finalKey(WritableComparable key)
+    throws IOException {
+      if (top) {
+        super.finalKey(key); 
+      } else {
+        reset();
+        Writable value = new ImmutableBytesWritable();
+        WritableComparable k = super.getClosest(midkey, value, true);
+        ByteArrayOutputStream byteout = new ByteArrayOutputStream();
+        DataOutputStream out = new DataOutputStream(byteout);
+        k.write(out);
+        ByteArrayInputStream bytein =
+          new ByteArrayInputStream(byteout.toByteArray());
+        DataInputStream in = new DataInputStream(bytein);
+        key.readFields(in);
+      }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public synchronized Writable get(WritableComparable key, Writable val)
+        throws IOException {
+      checkKey(key);
+      return super.get(key, val);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override
+    public synchronized WritableComparable getClosest(WritableComparable key,
+      Writable val)
+    throws IOException {
+      WritableComparable closest = null;
+      if (top) {
+        // If top, the lowest possible key is midkey.  Do not have to check
+        // what comes back from super getClosest.  Will return exact match or
+        // greater.
+        closest = (key.compareTo(this.midkey) < 0)?
+          this.midkey: super.getClosest(key, val);
+      } else {
+        // We're serving bottom of the file.
+        if (key.compareTo(this.midkey) < 0) {
+          // Check key is within range for bottom.
+          closest = super.getClosest(key, val);
+          // midkey was made against largest store file at time of split. Smaller
+          // store files could have anything in them.  Check return value is
+          // not beyond the midkey (getClosest returns exact match or next
+          // after).
+          if (closest != null && closest.compareTo(this.midkey) >= 0) {
+            // Don't let this value out.
+            closest = null;
+          }
+        }
+        // Else, key is > midkey so let out closest = null.
+      }
+      return closest;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unused")
+    @Override
+    public synchronized WritableComparable midKey() throws IOException {
+      // Returns null to indicate file is not splitable.
+      return null;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override
+    public synchronized boolean next(WritableComparable key, Writable val)
+    throws IOException {
+      if (firstNextCall) {
+        firstNextCall = false;
+        if (this.top) {
+          // Seek to midkey.  Midkey may not exist in this file.  That should be
+          // fine.  Then we'll either be positioned at end or start of file.
+          WritableComparable nearest = getClosest(midkey, val);
+          // Now copy the mid key into the passed key.
+          if (nearest != null) {
+            Writables.copyWritable(nearest, key);
+            return true;
+          }
+          return false;
+        }
+      }
+      boolean result = super.next(key, val);
+      if (!top && key.compareTo(midkey) >= 0) {
+        result = false;
+      }
+      return result;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public synchronized void reset() throws IOException {
+      if (top) {
+        firstNextCall = true;
+        seek(midkey);
+        return;
+      }
+      super.reset();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public synchronized boolean seek(WritableComparable key)
+    throws IOException {
+      checkKey(key);
+      return super.seek(key);
+    }
+  }
+}
\ No newline at end of file

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/HStoreScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/HStoreScanner.java?rev=677517&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/HStoreScanner.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/HStoreScanner.java Thu Jul 17 00:17:26 2008
@@ -0,0 +1,269 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util.migration.v5;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Scanner scans both the memcache and the HStore
+ */
+class HStoreScanner implements InternalScanner {
+  static final Log LOG = LogFactory.getLog(HStoreScanner.class);
+
+  private InternalScanner[] scanners;
+  private TreeMap<byte [], Cell>[] resultSets;
+  private HStoreKey[] keys;
+  private boolean wildcardMatch = false;
+  private boolean multipleMatchers = false;
+  private RowFilterInterface dataFilter;
+  private HStore store;
+  
+  /** Create an Scanner with a handle on the memcache and HStore files. */
+  @SuppressWarnings("unchecked")
+  HStoreScanner(HStore store, byte [][] targetCols, byte [] firstRow,
+    long timestamp, RowFilterInterface filter) 
+  throws IOException {
+    this.store = store;
+    this.dataFilter = filter;
+    if (null != dataFilter) {
+      dataFilter.reset();
+    }
+    this.scanners = new InternalScanner[2];
+    this.resultSets = new TreeMap[scanners.length];
+    this.keys = new HStoreKey[scanners.length];
+
+    try {
+      scanners[0] = store.memcache.getScanner(timestamp, targetCols, firstRow);
+      scanners[1] = new StoreFileScanner(store, timestamp, targetCols, firstRow);
+      for (int i = 0; i < scanners.length; i++) {
+        if (scanners[i].isWildcardScanner()) {
+          this.wildcardMatch = true;
+        }
+        if (scanners[i].isMultipleMatchScanner()) {
+          this.multipleMatchers = true;
+        }
+      }
+    } catch(IOException e) {
+      for (int i = 0; i < this.scanners.length; i++) {
+        if(scanners[i] != null) {
+          closeScanner(i);
+        }
+      }
+      throw e;
+    }
+    
+    // Advance to the first key in each scanner.
+    // All results will match the required column-set and scanTime.
+    for (int i = 0; i < scanners.length; i++) {
+      keys[i] = new HStoreKey();
+      resultSets[i] = new TreeMap<byte [], Cell>(Bytes.BYTES_COMPARATOR);
+      if(scanners[i] != null && !scanners[i].next(keys[i], resultSets[i])) {
+        closeScanner(i);
+      }
+    }
+  }
+
+  /** @return true if the scanner is a wild card scanner */
+  public boolean isWildcardScanner() {
+    return wildcardMatch;
+  }
+
+  /** @return true if the scanner is a multiple match scanner */
+  public boolean isMultipleMatchScanner() {
+    return multipleMatchers;
+  }
+
+  /** {@inheritDoc} */
+  public boolean next(HStoreKey key, SortedMap<byte [], Cell> results)
+    throws IOException {
+
+    // Filtered flag is set by filters.  If a cell has been 'filtered out'
+    // -- i.e. it is not to be returned to the caller -- the flag is 'true'.
+    boolean filtered = true;
+    boolean moreToFollow = true;
+    while (filtered && moreToFollow) {
+      // Find the lowest-possible key.
+      byte [] chosenRow = null;
+      long chosenTimestamp = -1;
+      for (int i = 0; i < this.keys.length; i++) {
+        if (scanners[i] != null &&
+            (chosenRow == null ||
+            (Bytes.compareTo(keys[i].getRow(), chosenRow) < 0) ||
+            ((Bytes.compareTo(keys[i].getRow(), chosenRow) == 0) &&
+            (keys[i].getTimestamp() > chosenTimestamp)))) {
+          chosenRow = keys[i].getRow();
+          chosenTimestamp = keys[i].getTimestamp();
+        }
+      }
+      
+      // Filter whole row by row key?
+      filtered = dataFilter != null? dataFilter.filterRowKey(chosenRow) : false;
+
+      // Store the key and results for each sub-scanner. Merge them as
+      // appropriate.
+      if (chosenTimestamp >= 0 && !filtered) {
+        // Here we are setting the passed in key with current row+timestamp
+        key.setRow(chosenRow);
+        key.setVersion(chosenTimestamp);
+        key.setColumn(HConstants.EMPTY_BYTE_ARRAY);
+        // Keep list of deleted cell keys within this row.  We need this
+        // because as we go through scanners, the delete record may be in an
+        // early scanner and then the same record with a non-delete, non-null
+        // value in a later. Without history of what we've seen, we'll return
+        // deleted values. This List should not ever grow too large since we
+        // are only keeping rows and columns that match those set on the
+        // scanner and which have delete values.  If memory usage becomes a
+        // problem, could redo as bloom filter.
+        List<HStoreKey> deletes = new ArrayList<HStoreKey>();
+        for (int i = 0; i < scanners.length && !filtered; i++) {
+          while ((scanners[i] != null
+              && !filtered
+              && moreToFollow)
+              && (Bytes.compareTo(keys[i].getRow(), chosenRow) == 0)) {
+            // If we are doing a wild card match or there are multiple
+            // matchers per column, we need to scan all the older versions of 
+            // this row to pick up the rest of the family members
+            if (!wildcardMatch
+                && !multipleMatchers
+                && (keys[i].getTimestamp() != chosenTimestamp)) {
+              break;
+            }
+
+            // NOTE: We used to do results.putAll(resultSets[i]);
+            // but this had the effect of overwriting newer
+            // values with older ones. So now we only insert
+            // a result if the map does not contain the key.
+            HStoreKey hsk = new HStoreKey(key.getRow(), HConstants.EMPTY_BYTE_ARRAY,
+              key.getTimestamp());
+            for (Map.Entry<byte [], Cell> e : resultSets[i].entrySet()) {
+              hsk.setColumn(e.getKey());
+              if (HLogEdit.isDeleted(e.getValue().getValue())) {
+                if (!deletes.contains(hsk)) {
+                  // Key changes as we cycle the for loop so add a copy to
+                  // the set of deletes.
+                  deletes.add(new HStoreKey(hsk));
+                }
+              } else if (!deletes.contains(hsk) &&
+                  !filtered &&
+                  moreToFollow &&
+                  !results.containsKey(e.getKey())) {
+                if (dataFilter != null) {
+                  // Filter whole row by column data?
+                  filtered = dataFilter.filterColumn(chosenRow, e.getKey(),
+                      e.getValue().getValue());
+                  if (filtered) {
+                    results.clear();
+                    break;
+                  }
+                }
+                results.put(e.getKey(), e.getValue());
+              }
+            }
+            resultSets[i].clear();
+            if (!scanners[i].next(keys[i], resultSets[i])) {
+              closeScanner(i);
+            }
+          }
+        }          
+      }
+      
+      for (int i = 0; i < scanners.length; i++) {
+        // If the current scanner is non-null AND has a lower-or-equal
+        // row label, then its timestamp is bad. We need to advance it.
+        while ((scanners[i] != null) &&
+            (Bytes.compareTo(keys[i].getRow(), chosenRow) <= 0)) {
+          resultSets[i].clear();
+          if (!scanners[i].next(keys[i], resultSets[i])) {
+            closeScanner(i);
+          }
+        }
+      }
+
+      moreToFollow = chosenTimestamp >= 0;
+      
+      if (dataFilter != null) {
+        if (dataFilter.filterAllRemaining()) {
+          moreToFollow = false;
+        }
+      }
+      
+      if (results.size() <= 0 && !filtered) {
+        // There were no results found for this row.  Marked it as 
+        // 'filtered'-out otherwise we will not move on to the next row.
+        filtered = true;
+      }
+    }
+    
+    // If we got no results, then there is no more to follow.
+    if (results == null || results.size() <= 0) {
+      moreToFollow = false;
+    }
+    
+    // Make sure scanners closed if no more results
+    if (!moreToFollow) {
+      for (int i = 0; i < scanners.length; i++) {
+        if (null != scanners[i]) {
+          closeScanner(i);
+        }
+      }
+    }
+    
+    return moreToFollow;
+  }
+  
+  /** Shut down a single scanner */
+  void closeScanner(int i) {
+    try {
+      try {
+        scanners[i].close();
+      } catch (IOException e) {
+        LOG.warn(store.storeName + " failed closing scanner " + i, e);
+      }
+    } finally {
+      scanners[i] = null;
+      keys[i] = null;
+      resultSets[i] = null;
+    }
+  }
+
+  /** {@inheritDoc} */
+  public void close() {
+    for(int i = 0; i < scanners.length; i++) {
+      if(scanners[i] != null) {
+        closeScanner(i);
+      }
+    }
+  }
+}

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/HTableDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/HTableDescriptor.java?rev=677517&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/HTableDescriptor.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/HTableDescriptor.java Thu Jul 17 00:17:26 2008
@@ -0,0 +1,328 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util.migration.v5;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * HTableDescriptor contains the name of an HTable, and its
+ * column families.
+ */
+public class HTableDescriptor implements WritableComparable {
+  /** Table descriptor for <core>-ROOT-</code> catalog table */
+  public static final HTableDescriptor ROOT_TABLEDESC = new HTableDescriptor(
+      HConstants.ROOT_TABLE_NAME,
+      new HColumnDescriptor[] { new HColumnDescriptor(HConstants.COLUMN_FAMILY,
+          1, HColumnDescriptor.CompressionType.NONE, false, false,
+          Integer.MAX_VALUE, HConstants.FOREVER, false) });
+  
+  /** Table descriptor for <code>.META.</code> catalog table */
+  public static final HTableDescriptor META_TABLEDESC = new HTableDescriptor(
+      HConstants.META_TABLE_NAME, new HColumnDescriptor[] {
+          new HColumnDescriptor(HConstants.COLUMN_FAMILY, 1,
+              HColumnDescriptor.CompressionType.NONE, false, false,
+              Integer.MAX_VALUE, HConstants.FOREVER, false),
+          new HColumnDescriptor(HConstants.COLUMN_FAMILY_HISTORIAN,
+              HConstants.ALL_VERSIONS, HColumnDescriptor.CompressionType.NONE,
+              false, false, Integer.MAX_VALUE, HConstants.FOREVER, false) });
+  
+  private boolean rootregion = false;
+  private boolean metaregion = false;
+  private byte [] name = HConstants.EMPTY_BYTE_ARRAY;
+  private String nameAsString = "";
+  
+  public static final String FAMILIES = "FAMILIES";
+  
+  // Key is hash of the family name.
+  private final Map<Integer, HColumnDescriptor> families =
+    new HashMap<Integer, HColumnDescriptor>();
+
+  /**
+   * Private constructor used internally creating table descriptors for 
+   * catalog tables: e.g. .META. and -ROOT-.
+   */
+  private HTableDescriptor(final byte [] name, HColumnDescriptor[] families) {
+    this.name = name.clone();
+    setMetaFlags(name);
+    for(HColumnDescriptor descriptor : families) {
+      this.families.put(Bytes.mapKey(descriptor.getName()), descriptor);
+    }
+  }
+
+  /**
+   * Constructs an empty object.
+   * For deserializing an HTableDescriptor instance only.
+   * @see #HTableDescriptor(byte[])
+   */
+  public HTableDescriptor() {
+    super();
+  }
+
+  /**
+   * Constructor.
+   * @param name Table name.
+   * @throws IllegalArgumentException if passed a table name
+   * that is made of other than 'word' characters, underscore or period: i.e.
+   * <code>[a-zA-Z_0-9.].
+   * @see <a href="HADOOP-1581">HADOOP-1581 HBASE: Un-openable tablename bug</a>
+   */
+  public HTableDescriptor(final String name) {
+    this(Bytes.toBytes(name));
+  }
+
+  /**
+   * Constructor.
+   * @param name Table name.
+   * @throws IllegalArgumentException if passed a table name
+   * that is made of other than 'word' characters, underscore or period: i.e.
+   * <code>[a-zA-Z_0-9.].
+   * @see <a href="HADOOP-1581">HADOOP-1581 HBASE: Un-openable tablename bug</a>
+   */
+  public HTableDescriptor(final byte [] name) {
+    setMetaFlags(name);
+    this.name = this.metaregion? name: isLegalTableName(name);
+    this.nameAsString = Bytes.toString(this.name);
+  }
+
+  /*
+   * Set meta flags on this table.
+   * Called by constructors.
+   * @param name
+   */
+  private void setMetaFlags(final byte [] name) {
+    this.rootregion = Bytes.equals(name, HConstants.ROOT_TABLE_NAME);
+    this.metaregion =
+      this.rootregion? true: Bytes.equals(name, HConstants.META_TABLE_NAME);
+  }
+  
+  /**
+   * Check passed buffer is legal user-space table name.
+   * @param b Table name.
+   * @return Returns passed <code>b</code> param
+   * @throws NullPointerException If passed <code>b</code> is null
+   * @throws IllegalArgumentException if passed a table name
+   * that is made of other than 'word' characters or underscores: i.e.
+   * <code>[a-zA-Z_0-9].
+   */
+  public static byte [] isLegalTableName(final byte [] b) {
+    if (b == null || b.length <= 0) {
+      throw new IllegalArgumentException("Name is null or empty");
+    }
+    for (int i = 0; i < b.length; i++) {
+      if (Character.isLetterOrDigit(b[i]) || b[i] == '_') {
+        continue;
+      }
+      throw new IllegalArgumentException("Illegal character <" + b[i] + ">. " +
+        "User-space table names can only contain 'word characters':" +
+        "i.e. [a-zA-Z_0-9]: " + Bytes.toString(b));
+    }
+    return b;
+  }
+
+  /** @return true if this is the root region */
+  public boolean isRootRegion() {
+    return rootregion;
+  }
+  
+  /** @return true if table is the meta table */
+  public boolean isMetaTable() {
+    return metaregion && !rootregion;
+  }
+  
+  /** @return true if this is a meta region (part of the root or meta tables) */
+  public boolean isMetaRegion() {
+    return metaregion;
+  }
+
+  /** @return name of table */
+  public byte [] getName() {
+    return name;
+  }
+
+  /** @return name of table */
+  public String getNameAsString() {
+    return this.nameAsString;
+  }
+
+  /**
+   * Adds a column family.
+   * @param family HColumnDescriptor of familyto add.
+   */
+  public void addFamily(final HColumnDescriptor family) {
+    if (family.getName() == null || family.getName().length <= 0) {
+      throw new NullPointerException("Family name cannot be null or empty");
+    }
+    this.families.put(Bytes.mapKey(family.getName()), family);
+  }
+
+  /**
+   * Checks to see if this table contains the given column family
+   * @param c Family name or column name.
+   * @return true if the table contains the specified family name
+   */
+  public boolean hasFamily(final byte [] c) {
+    return hasFamily(c, HStoreKey.getFamilyDelimiterIndex(c));
+  }
+
+  /**
+   * Checks to see if this table contains the given column family
+   * @param c Family name or column name.
+   * @param index Index to column family delimiter
+   * @return true if the table contains the specified family name
+   */
+  public boolean hasFamily(final byte [] c, final int index) {
+    // If index is -1, then presume we were passed a column family name minus
+    // the colon delimiter.
+    return families.containsKey(Bytes.mapKey(c, index == -1? c.length: index));
+  }
+
+  /**
+   * @return Name of this table and then a map of all of the column family
+   * descriptors.
+   * @see #getNameAsString()
+   */
+  @Override
+  public String toString() {
+    return HConstants.NAME + " => '" + Bytes.toString(this.name) +
+      "', " + FAMILIES + " => " + this.families.values();
+  }
+  
+  /** {@inheritDoc} */
+  @Override
+  public boolean equals(Object obj) {
+    return compareTo(obj) == 0;
+  }
+  
+  /** {@inheritDoc} */
+  @Override
+  public int hashCode() {
+    // TODO: Cache.
+    int result = Bytes.hashCode(this.name);
+    if (this.families != null && this.families.size() > 0) {
+      for (HColumnDescriptor e: this.families.values()) {
+        result ^= e.hashCode();
+      }
+    }
+    return result;
+  }
+  
+  // Writable
+
+  /** {@inheritDoc} */
+  public void write(DataOutput out) throws IOException {
+    out.writeBoolean(rootregion);
+    out.writeBoolean(metaregion);
+    Bytes.writeByteArray(out, name);
+    out.writeInt(families.size());
+    for(Iterator<HColumnDescriptor> it = families.values().iterator();
+        it.hasNext(); ) {
+      it.next().write(out);
+    }
+  }
+
+  /** {@inheritDoc} */
+  public void readFields(DataInput in) throws IOException {
+    this.rootregion = in.readBoolean();
+    this.metaregion = in.readBoolean();
+    this.name = Bytes.readByteArray(in);
+    this.nameAsString = Bytes.toString(this.name);
+    int numCols = in.readInt();
+    this.families.clear();
+    for (int i = 0; i < numCols; i++) {
+      HColumnDescriptor c = new HColumnDescriptor();
+      c.readFields(in);
+      this.families.put(Bytes.mapKey(c.getName()), c);
+    }
+  }
+
+  // Comparable
+
+  /** {@inheritDoc} */
+  public int compareTo(Object o) {
+    HTableDescriptor other = (HTableDescriptor) o;
+    int result = Bytes.compareTo(this.name, other.name);
+    if (result == 0) {
+      result = families.size() - other.families.size();
+    }
+    
+    if (result == 0 && families.size() != other.families.size()) {
+      result = Integer.valueOf(families.size()).compareTo(
+          Integer.valueOf(other.families.size()));
+    }
+    
+    if (result == 0) {
+      for (Iterator<HColumnDescriptor> it = families.values().iterator(),
+          it2 = other.families.values().iterator(); it.hasNext(); ) {
+        result = it.next().compareTo(it2.next());
+        if (result != 0) {
+          break;
+        }
+      }
+    }
+    return result;
+  }
+
+  /**
+   * @return Immutable sorted map of families.
+   */
+  public Collection<HColumnDescriptor> getFamilies() {
+    return Collections.unmodifiableCollection(this.families.values());
+  }
+
+  /**
+   * @param column
+   * @return Column descriptor for the passed family name or the family on
+   * passed in column.
+   */
+  public HColumnDescriptor getFamily(final byte [] column) {
+    return this.families.get(HStoreKey.getFamilyMapKey(column));
+  }
+
+  /**
+   * @param column
+   * @return Column descriptor for the passed family name or the family on
+   * passed in column.
+   */
+  public HColumnDescriptor removeFamily(final byte [] column) {
+    return this.families.remove(HStoreKey.getFamilyMapKey(column));
+  }
+
+  /**
+   * @param rootdir qualified path of HBase root directory
+   * @param tableName name of table
+   * @return path for table
+   */
+  public static Path getTableDir(Path rootdir, final byte [] tableName) {
+    return new Path(rootdir, Bytes.toString(tableName));
+  }
+}

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/LogRollListener.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/LogRollListener.java?rev=677517&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/LogRollListener.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/LogRollListener.java Thu Jul 17 00:17:26 2008
@@ -0,0 +1,29 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util.migration.v5;
+
+/**
+ * Mechanism by which the HLog requests a log roll
+ */
+public interface LogRollListener {
+  /** Request that the log be rolled */
+  public void logRollRequested();
+}
\ No newline at end of file

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/Memcache.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/Memcache.java?rev=677517&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/Memcache.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/Memcache.java Thu Jul 17 00:17:26 2008
@@ -0,0 +1,760 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util.migration.v5;
+
+import java.io.IOException;
+import java.rmi.UnexpectedException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.regionserver.HAbstractScanner;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+
+
+/**
+ * The Memcache holds in-memory modifications to the HRegion.
+ * Keeps a current map.  When asked to flush the map, current map is moved
+ * to snapshot and is cleared.  We continue to serve edits out of new map
+ * and backing snapshot until flusher reports in that the flush succeeded. At
+ * this point we let the snapshot go.
+ */
+class Memcache {
+  private final Log LOG = LogFactory.getLog(this.getClass().getName());
+  
+  private long ttl;
+
+  // Note that since these structures are always accessed with a lock held,
+  // so no additional synchronization is required.
+  
+  // The currently active sorted map of edits.
+  private volatile SortedMap<HStoreKey, byte[]> memcache =
+    createSynchronizedSortedMap();
+
+  // Snapshot of memcache.  Made for flusher.
+  private volatile SortedMap<HStoreKey, byte[]> snapshot =
+    createSynchronizedSortedMap();
+
+  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+  /**
+   * Default constructor. Used for tests.
+   */
+  public Memcache()
+  {
+    ttl = HConstants.FOREVER;
+  }
+
+  /**
+   * Constructor.
+   * @param ttl The TTL for cache entries, in milliseconds.
+   */
+  public Memcache(long ttl) {
+    this.ttl = ttl;
+  }
+
+  /*
+   * Utility method.
+   * @return sycnhronized sorted map of HStoreKey to byte arrays.
+   */
+  private static SortedMap<HStoreKey, byte[]> createSynchronizedSortedMap() {
+    return Collections.synchronizedSortedMap(new TreeMap<HStoreKey, byte []>());
+  }
+
+  /**
+   * Creates a snapshot of the current Memcache.
+   * Snapshot must be cleared by call to {@link #clearSnapshot(SortedMap)}
+   * To get the snapshot made by this method, use
+   * {@link #getSnapshot}.
+   */
+  void snapshot() {
+    this.lock.writeLock().lock();
+    try {
+      // If snapshot currently has entries, then flusher failed or didn't call
+      // cleanup.  Log a warning.
+      if (this.snapshot.size() > 0) {
+        LOG.debug("Snapshot called again without clearing previous. " +
+          "Doing nothing. Another ongoing flush or did we fail last attempt?");
+      } else {
+        // We used to synchronize on the memcache here but we're inside a
+        // write lock so removed it. Comment is left in case removal was a
+        // mistake. St.Ack
+        if (this.memcache.size() != 0) {
+          this.snapshot = this.memcache;
+          this.memcache = createSynchronizedSortedMap();
+        }
+      }
+    } finally {
+      this.lock.writeLock().unlock();
+    }
+  }
+  
+  /**
+   * Return the current snapshot.
+   * Called by flusher to get current snapshot made by a previous
+   * call to {@link snapshot}.
+   * @return Return snapshot.
+   * @see {@link #snapshot()}
+   * @see {@link #clearSnapshot(SortedMap)}
+   */
+  SortedMap<HStoreKey, byte[]> getSnapshot() {
+    return this.snapshot;
+  }
+
+  /**
+   * The passed snapshot was successfully persisted; it can be let go.
+   * @param ss The snapshot to clean out.
+   * @throws UnexpectedException
+   * @see {@link #snapshot()}
+   */
+  void clearSnapshot(final SortedMap<HStoreKey, byte []> ss)
+  throws UnexpectedException {
+    this.lock.writeLock().lock();
+    try {
+      if (this.snapshot != ss) {
+        throw new UnexpectedException("Current snapshot is " +
+          this.snapshot + ", was passed " + ss);
+      }
+      // OK. Passed in snapshot is same as current snapshot.  If not-empty,
+      // create a new snapshot and let the old one go.
+      if (ss.size() != 0) {
+        this.snapshot = createSynchronizedSortedMap();
+      }
+    } finally {
+      this.lock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * Write an update
+   * @param key
+   * @param value
+   * @return memcache size delta
+   */
+  long add(final HStoreKey key, final byte[] value) {
+    this.lock.readLock().lock();
+    try {
+      byte[] oldValue = this.memcache.remove(key);
+      this.memcache.put(key, value);
+      return key.getSize() + (value == null ? 0 : value.length) -
+          (oldValue == null ? 0 : oldValue.length);
+    } finally {
+      this.lock.readLock().unlock();
+    }
+  }
+
+  /**
+   * Look back through all the backlog TreeMaps to find the target.
+   * @param key
+   * @param numVersions
+   * @return An array of byte arrays ordered by timestamp.
+   */
+  List<Cell> get(final HStoreKey key, final int numVersions) {
+    this.lock.readLock().lock();
+    try {
+      List<Cell> results;
+      // The synchronizations here are because internalGet iterates
+      synchronized (this.memcache) {
+        results = internalGet(this.memcache, key, numVersions);
+      }
+      synchronized (this.snapshot) {
+        results.addAll(results.size(),
+          internalGet(this.snapshot, key, numVersions - results.size()));
+      }
+      return results;
+    } finally {
+      this.lock.readLock().unlock();
+    }
+  }
+  
+  /**
+   * @param a
+   * @param b
+   * @return Return lowest of a or b or null if both a and b are null
+   */
+  @SuppressWarnings("unchecked")
+  private byte [] getLowest(final byte [] a,
+      final byte [] b) {
+    if (a == null) {
+      return b;
+    }
+    if (b == null) {
+      return a;
+    }
+    return Bytes.compareTo(a, b) <= 0? a: b;
+  }
+
+  /**
+   * @param row Find the row that comes after this one.
+   * @return Next row or null if none found
+   */
+  byte [] getNextRow(final byte [] row) {
+    this.lock.readLock().lock();
+    try {
+      return getLowest(getNextRow(row, this.memcache),
+        getNextRow(row, this.snapshot));
+    } finally {
+      this.lock.readLock().unlock();
+    }
+  }
+  
+  /*
+   * @param row Find row that follows this one.
+   * @param map Map to look in for a row beyond <code>row</code>.
+   * This method synchronizes on passed map while iterating it.
+   * @return Next row or null if none found.
+   */
+  private byte [] getNextRow(final byte [] row,
+      final SortedMap<HStoreKey, byte []> map) {
+    byte [] result = null;
+    // Synchronize on the map to make the tailMap making 'safe'.
+    synchronized (map) {
+      // Make an HSK with maximum timestamp so we get past most of the current
+      // rows cell entries.
+      HStoreKey hsk = new HStoreKey(row, HConstants.LATEST_TIMESTAMP);
+      SortedMap<HStoreKey, byte []> tailMap = map.tailMap(hsk);
+      // Iterate until we fall into the next row; i.e. move off current row
+      for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) {
+        HStoreKey itKey = es.getKey();
+        if (Bytes.compareTo(itKey.getRow(), row) <= 0) {
+          continue;
+        }
+        // Note: Not suppressing deletes or expired cells.
+        result = itKey.getRow();
+        break;
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Return all the available columns for the given key.  The key indicates a 
+   * row and timestamp, but not a column name.
+   * @param key
+   * @param columns Pass null for all columns else the wanted subset.
+   * @param deletes Map to accumulate deletes found.
+   * @param results Where to stick row results found.
+   */
+  void getFull(HStoreKey key, Set<byte []> columns, Map<byte [], Long> deletes, 
+    Map<byte [], Cell> results) {
+    this.lock.readLock().lock();
+    try {
+      // The synchronizations here are because internalGet iterates
+      synchronized (this.memcache) {
+        internalGetFull(this.memcache, key, columns, deletes, results);
+      }
+      synchronized (this.snapshot) {
+        internalGetFull(this.snapshot, key, columns, deletes, results);
+      }
+    } finally {
+      this.lock.readLock().unlock();
+    }
+  }
+
+  private void internalGetFull(SortedMap<HStoreKey, byte[]> map, HStoreKey key, 
+      Set<byte []> columns, Map<byte [], Long> deletes,
+      Map<byte [], Cell> results) {
+    if (map.isEmpty() || key == null) {
+      return;
+    }
+    List<HStoreKey> victims = new ArrayList<HStoreKey>();
+    SortedMap<HStoreKey, byte[]> tailMap = map.tailMap(key);
+    long now = System.currentTimeMillis();
+    for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) {
+      HStoreKey itKey = es.getKey();
+      byte [] itCol = itKey.getColumn();
+      if (results.get(itCol) == null && key.matchesWithoutColumn(itKey)) {
+        if (columns == null || columns.contains(itKey.getColumn())) {
+          byte [] val = tailMap.get(itKey);
+          if (HLogEdit.isDeleted(val)) {
+            if (!deletes.containsKey(itCol) 
+              || deletes.get(itCol).longValue() < itKey.getTimestamp()) {
+              deletes.put(itCol, Long.valueOf(itKey.getTimestamp()));
+            }
+          } else if (!(deletes.containsKey(itCol) 
+              && deletes.get(itCol).longValue() >= itKey.getTimestamp())) {
+            // Skip expired cells
+            if (ttl == HConstants.FOREVER ||
+                  now < itKey.getTimestamp() + ttl) {
+              results.put(itCol, new Cell(val, itKey.getTimestamp()));
+            } else {
+              victims.add(itKey);
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("internalGetFull: " + itKey + ": expired, skipped");
+              }
+            }
+          }
+        }
+      } else if (Bytes.compareTo(key.getRow(), itKey.getRow()) < 0) {
+        break;
+      }
+    }
+    // Remove expired victims from the map.
+    for (HStoreKey v: victims)
+      map.remove(v);
+  }
+
+  /**
+   * @param row Row to look for.
+   * @param candidateKeys Map of candidate keys (Accumulation over lots of
+   * lookup over stores and memcaches)
+   */
+  void getRowKeyAtOrBefore(final byte [] row, 
+    SortedMap<HStoreKey, Long> candidateKeys) {
+    this.lock.readLock().lock();
+    try {
+      synchronized (memcache) {
+        internalGetRowKeyAtOrBefore(memcache, row, candidateKeys);
+      }
+      synchronized (snapshot) {
+        internalGetRowKeyAtOrBefore(snapshot, row, candidateKeys);
+      }
+    } finally {
+      this.lock.readLock().unlock();
+    }
+  }
+
+  private void internalGetRowKeyAtOrBefore(SortedMap<HStoreKey, byte []> map,
+    byte [] key, SortedMap<HStoreKey, Long> candidateKeys) {
+    HStoreKey strippedKey = null;
+    
+    // we want the earliest possible to start searching from
+    HStoreKey search_key = candidateKeys.isEmpty() ? 
+      new HStoreKey(key) : new HStoreKey(candidateKeys.firstKey().getRow());
+    Iterator<HStoreKey> key_iterator = null;
+    HStoreKey found_key = null;
+    ArrayList<HStoreKey> victims = new ArrayList<HStoreKey>();
+    long now = System.currentTimeMillis();
+    // get all the entries that come equal or after our search key
+    SortedMap<HStoreKey, byte []> tailMap = map.tailMap(search_key);
+    
+    // if there are items in the tail map, there's either a direct match to
+    // the search key, or a range of values between the first candidate key
+    // and the ultimate search key (or the end of the cache)
+    if (!tailMap.isEmpty() &&
+        Bytes.compareTo(tailMap.firstKey().getRow(), key) <= 0) {
+      key_iterator = tailMap.keySet().iterator();
+
+      // keep looking at cells as long as they are no greater than the 
+      // ultimate search key and there's still records left in the map.
+      do {
+        found_key = key_iterator.next();
+        if (Bytes.compareTo(found_key.getRow(), key) <= 0) {
+          strippedKey = stripTimestamp(found_key);
+          if (HLogEdit.isDeleted(tailMap.get(found_key))) {
+            if (candidateKeys.containsKey(strippedKey)) {
+              long bestCandidateTs = 
+                candidateKeys.get(strippedKey).longValue();
+              if (bestCandidateTs <= found_key.getTimestamp()) {
+                candidateKeys.remove(strippedKey);
+              }
+            }
+          } else {
+            if (ttl == HConstants.FOREVER ||
+                  now < found_key.getTimestamp() + ttl) {
+              candidateKeys.put(strippedKey, 
+                new Long(found_key.getTimestamp()));
+            } else {
+              victims.add(found_key);
+              if (LOG.isDebugEnabled()) {
+                LOG.debug(":" + found_key + ": expired, skipped");
+              }
+            }
+          }
+        }
+      } while (Bytes.compareTo(found_key.getRow(), key) <= 0 
+        && key_iterator.hasNext());
+    } else {
+      // the tail didn't contain any keys that matched our criteria, or was 
+      // empty. examine all the keys that preceed our splitting point.
+      SortedMap<HStoreKey, byte []> headMap = map.headMap(search_key);
+      
+      // if we tried to create a headMap and got an empty map, then there are
+      // no keys at or before the search key, so we're done.
+      if (headMap.isEmpty()) {
+        return;
+      }        
+      
+      // if there aren't any candidate keys at this point, we need to search
+      // backwards until we find at least one candidate or run out of headMap.
+      if (candidateKeys.isEmpty()) {
+        HStoreKey[] cells = 
+          headMap.keySet().toArray(new HStoreKey[headMap.keySet().size()]);
+           
+        byte [] lastRowFound = null;
+        for(int i = cells.length - 1; i >= 0; i--) {
+          HStoreKey thisKey = cells[i];
+          
+          // if the last row we found a candidate key for is different than
+          // the row of the current candidate, we can stop looking.
+          if (lastRowFound != null &&
+              !Bytes.equals(lastRowFound, thisKey.getRow())) {
+            break;
+          }
+          
+          // if this isn't a delete, record it as a candidate key. also 
+          // take note of the row of this candidate so that we'll know when
+          // we cross the row boundary into the previous row.
+          if (!HLogEdit.isDeleted(headMap.get(thisKey))) {
+            if (ttl == HConstants.FOREVER) {
+              lastRowFound = thisKey.getRow();
+              candidateKeys.put(stripTimestamp(thisKey), 
+                new Long(thisKey.getTimestamp()));
+            } else {
+              victims.add(found_key);
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("internalGetRowKeyAtOrBefore: " + found_key +
+                  ": expired, skipped");
+              }
+            }
+          }
+        }
+      } else {
+        // if there are already some candidate keys, we only need to consider
+        // the very last row's worth of keys in the headMap, because any 
+        // smaller acceptable candidate keys would have caused us to start
+        // our search earlier in the list, and we wouldn't be searching here.
+        SortedMap<HStoreKey, byte[]> thisRowTailMap = 
+          headMap.tailMap(new HStoreKey(headMap.lastKey().getRow()));
+          
+        key_iterator = thisRowTailMap.keySet().iterator();
+  
+        do {
+          found_key = key_iterator.next();
+          
+          if (HLogEdit.isDeleted(thisRowTailMap.get(found_key))) {
+            strippedKey = stripTimestamp(found_key);              
+            if (candidateKeys.containsKey(strippedKey)) {
+              long bestCandidateTs = 
+                candidateKeys.get(strippedKey).longValue();
+              if (bestCandidateTs <= found_key.getTimestamp()) {
+                candidateKeys.remove(strippedKey);
+              }
+            }
+          } else {
+            if (ttl == HConstants.FOREVER ||
+                    now < found_key.getTimestamp() + ttl) {
+              candidateKeys.put(stripTimestamp(found_key), 
+                Long.valueOf(found_key.getTimestamp()));
+            } else {
+              victims.add(found_key);
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("internalGetRowKeyAtOrBefore: " + found_key +
+                  ": expired, skipped");
+              }
+            }
+          }
+        } while (key_iterator.hasNext());
+      }
+    }
+    // Remove expired victims from the map.
+    for (HStoreKey victim: victims)
+      map.remove(victim);
+  }
+  
+  static HStoreKey stripTimestamp(HStoreKey key) {
+    return new HStoreKey(key.getRow(), key.getColumn());
+  }
+  
+  /**
+   * Examine a single map for the desired key.
+   *
+   * TODO - This is kinda slow.  We need a data structure that allows for 
+   * proximity-searches, not just precise-matches.
+   * 
+   * @param map
+   * @param key
+   * @param numVersions
+   * @return Ordered list of items found in passed <code>map</code>.  If no
+   * matching values, returns an empty list (does not return null).
+   */
+  private ArrayList<Cell> internalGet(
+      final SortedMap<HStoreKey, byte []> map, final HStoreKey key,
+      final int numVersions) {
+
+    ArrayList<Cell> result = new ArrayList<Cell>();
+    
+    // TODO: If get is of a particular version -- numVersions == 1 -- we
+    // should be able to avoid all of the tailmap creations and iterations
+    // below.
+    long now = System.currentTimeMillis();
+    List<HStoreKey> victims = new ArrayList<HStoreKey>();
+    SortedMap<HStoreKey, byte []> tailMap = map.tailMap(key);
+    for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) {
+      HStoreKey itKey = es.getKey();
+      if (itKey.matchesRowCol(key)) {
+        if (!HLogEdit.isDeleted(es.getValue())) { 
+          // Filter out expired results
+          if (ttl == HConstants.FOREVER ||
+                now < itKey.getTimestamp() + ttl) {
+            result.add(new Cell(tailMap.get(itKey), itKey.getTimestamp()));
+            if (numVersions > 0 && result.size() >= numVersions) {
+              break;
+            }
+          } else {
+            victims.add(itKey);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("internalGet: " + itKey + ": expired, skipped");
+            }
+          }
+        }
+      } else {
+        // By L.N. HBASE-684, map is sorted, so we can't find match any more.
+        break;
+      }
+    }
+    // Remove expired victims from the map.
+    for (HStoreKey v: victims) {
+      map.remove(v);
+    }
+    return result;
+  }
+
+  /**
+   * Get <code>versions</code> keys matching the origin key's
+   * row/column/timestamp and those of an older vintage
+   * Default access so can be accessed out of {@link HRegionServer}.
+   * @param origin Where to start searching.
+   * @param versions How many versions to return. Pass
+   * {@link HConstants.ALL_VERSIONS} to retrieve all.
+   * @return Ordered list of <code>versions</code> keys going from newest back.
+   * @throws IOException
+   */
+  List<HStoreKey> getKeys(final HStoreKey origin, final int versions) {
+    this.lock.readLock().lock();
+    try {
+      List<HStoreKey> results;
+      synchronized (memcache) {
+        results = internalGetKeys(this.memcache, origin, versions);
+      }
+      synchronized (snapshot) {
+        results.addAll(results.size(), internalGetKeys(snapshot, origin,
+            versions == HConstants.ALL_VERSIONS ? versions :
+              (versions - results.size())));
+      }
+      return results;
+      
+    } finally {
+      this.lock.readLock().unlock();
+    }
+  }
+
+  /*
+   * @param origin Where to start searching.
+   * @param versions How many versions to return. Pass
+   * {@link HConstants.ALL_VERSIONS} to retrieve all.
+   * @return List of all keys that are of the same row and column and of
+   * equal or older timestamp.  If no keys, returns an empty List. Does not
+   * return null.
+   */
+  private List<HStoreKey> internalGetKeys(
+      final SortedMap<HStoreKey, byte []> map, final HStoreKey origin,
+      final int versions) {
+
+    long now = System.currentTimeMillis();
+    List<HStoreKey> result = new ArrayList<HStoreKey>();
+    List<HStoreKey> victims = new ArrayList<HStoreKey>();
+    SortedMap<HStoreKey, byte []> tailMap = map.tailMap(origin);
+    for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) {
+      HStoreKey key = es.getKey();
+  
+      // if there's no column name, then compare rows and timestamps
+      if (origin.getColumn() != null && origin.getColumn().length == 0) {
+        // if the current and origin row don't match, then we can jump
+        // out of the loop entirely.
+        if (!Bytes.equals(key.getRow(), origin.getRow())) {
+          break;
+        }
+        // if the rows match but the timestamp is newer, skip it so we can
+        // get to the ones we actually want.
+        if (key.getTimestamp() > origin.getTimestamp()) {
+          continue;
+        }
+      }
+      else{ // compare rows and columns
+        // if the key doesn't match the row and column, then we're done, since 
+        // all the cells are ordered.
+        if (!key.matchesRowCol(origin)) {
+          break;
+        }
+      }
+      if (!HLogEdit.isDeleted(es.getValue())) {
+        if (ttl == HConstants.FOREVER || now < key.getTimestamp() + ttl) {
+          result.add(key);
+        } else {
+          victims.add(key);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("internalGetKeys: " + key + ": expired, skipped");
+          }
+        }
+        if (result.size() >= versions) {
+          // We have enough results.  Return.
+          break;
+        }
+      }
+    }
+
+    // Clean expired victims from the map.
+    for (HStoreKey v: victims)
+       map.remove(v);
+
+    return result;
+  }
+
+
+  /**
+   * @param key
+   * @return True if an entry and its content is {@link HGlobals.deleteBytes}.
+   * Use checking values in store. On occasion the memcache has the fact that
+   * the cell has been deleted.
+   */
+  boolean isDeleted(final HStoreKey key) {
+    return HLogEdit.isDeleted(this.memcache.get(key));
+  }
+
+  /**
+   * @return a scanner over the keys in the Memcache
+   */
+  InternalScanner getScanner(long timestamp,
+    final byte [][] targetCols, final byte [] firstRow)
+  throws IOException {
+    this.lock.readLock().lock();
+    try {
+      return new MemcacheScanner(timestamp, targetCols, firstRow);
+    } finally {
+      this.lock.readLock().unlock();
+    }
+  }
+
+  //////////////////////////////////////////////////////////////////////////////
+  // MemcacheScanner implements the InternalScanner.
+  // It lets the caller scan the contents of the Memcache.
+  //////////////////////////////////////////////////////////////////////////////
+
+  private class MemcacheScanner extends HAbstractScanner {
+    private byte [] currentRow;
+    private Set<byte []> columns = null;
+    
+    MemcacheScanner(final long timestamp, final byte [] targetCols[],
+      final byte [] firstRow)
+    throws IOException {
+      // Call to super will create ColumnMatchers and whether this is a regex
+      // scanner or not.  Will also save away timestamp.  Also sorts rows.
+      super(timestamp, targetCols);
+      this.currentRow = firstRow;
+      // If we're being asked to scan explicit columns rather than all in 
+      // a family or columns that match regexes, cache the sorted array of
+      // columns.
+      this.columns = null;
+      if (!isWildcardScanner()) {
+        this.columns = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
+        for (int i = 0; i < targetCols.length; i++) {
+          this.columns.add(targetCols[i]);
+        }
+      }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean next(HStoreKey key, SortedMap<byte [], Cell> results)
+    throws IOException {
+      if (this.scannerClosed) {
+        return false;
+      }
+      // This is a treemap rather than a Hashmap because then I can have a
+      // byte array as key -- because I can independently specify a comparator.
+      Map<byte [], Long> deletes =
+        new TreeMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
+      // Catch all row results in here.  These results are ten filtered to
+      // ensure they match column name regexes, or if none, added to results.
+      Map<byte [], Cell> rowResults =
+        new TreeMap<byte [], Cell>(Bytes.BYTES_COMPARATOR);
+      if (results.size() > 0) {
+        results.clear();
+      }
+      long latestTimestamp = -1;
+      while (results.size() <= 0 && this.currentRow != null) {
+        if (deletes.size() > 0) {
+          deletes.clear();
+        }
+        if (rowResults.size() > 0) {
+          rowResults.clear();
+        }
+        key.setRow(this.currentRow);
+        key.setVersion(this.timestamp);
+        getFull(key, isWildcardScanner() ? null : this.columns, deletes,
+            rowResults);
+        for (Map.Entry<byte [], Long> e: deletes.entrySet()) {
+          rowResults.put(e.getKey(),
+            new Cell(HLogEdit.deleteBytes.get(), e.getValue().longValue()));
+        }
+        for (Map.Entry<byte [], Cell> e: rowResults.entrySet()) {
+          byte [] column = e.getKey();
+          Cell c = e.getValue();
+          if (isWildcardScanner()) {
+            // Check the results match.  We only check columns, not timestamps.
+            // We presume that timestamps have been handled properly when we
+            // called getFull.
+            if (!columnMatch(column)) {
+              continue;
+            }
+          }
+          // We should never return HConstants.LATEST_TIMESTAMP as the time for
+          // the row. As a compromise, we return the largest timestamp for the
+          // entries that we find that match.
+          if (c.getTimestamp() != HConstants.LATEST_TIMESTAMP &&
+              c.getTimestamp() > latestTimestamp) {
+            latestTimestamp = c.getTimestamp();
+          }
+          results.put(column, c);
+        }
+        this.currentRow = getNextRow(this.currentRow);
+
+      }
+      // Set the timestamp to the largest one for the row if we would otherwise
+      // return HConstants.LATEST_TIMESTAMP
+      if (key.getTimestamp() == HConstants.LATEST_TIMESTAMP) {
+        key.setVersion(latestTimestamp);
+      }
+      return results.size() > 0;
+    }
+
+    /** {@inheritDoc} */
+    public void close() {
+      if (!scannerClosed) {
+        scannerClosed = true;
+      }
+    }
+  }
+}