You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2008/07/17 09:17:28 UTC

svn commit: r677517 [4/6] - in /hadoop/hbase/trunk: ./ src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/client/ src/java/org/apache/hadoop/hbase/ipc/ src/java/org/apache/hadoop/hbase/master/ src/java/org/apache/hadoop/hbase/regionserv...

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/HRegionInfo.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/HRegionInfo.java?rev=677517&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/HRegionInfo.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/HRegionInfo.java Thu Jul 17 00:17:26 2008
@@ -0,0 +1,451 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util.migration.v5;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JenkinsHash;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * HRegion information.
+ * Contains HRegion id, start and end keys, a reference to this
+ * HRegions' table descriptor, etc.
+ * 
+ * <p>This class has been modified so it instantiates using pre-v5 versions of
+ * the HTableDescriptor, etc: i.e. it will uses classes that in this
+ * migration v0_2 package.
+ */
+public class HRegionInfo implements WritableComparable {
+  /**
+   * @param regionName
+   * @return the encodedName
+   */
+  public static int encodeRegionName(final byte [] regionName) {
+    return Math.abs(JenkinsHash.hash(regionName, regionName.length, 0));
+  }
+
+  /** delimiter used between portions of a region name */
+  public static final int DELIMITER = ',';
+
+  /** HRegionInfo for root region */
+  public static final HRegionInfo ROOT_REGIONINFO =
+    new HRegionInfo(0L, HTableDescriptor.ROOT_TABLEDESC);
+
+  /** HRegionInfo for first meta region */
+  public static final HRegionInfo FIRST_META_REGIONINFO =
+    new HRegionInfo(1L, HTableDescriptor.META_TABLEDESC);
+
+  /**
+   * Extracts table name prefix from a metaregion row name.
+   * @param regionName A metaregion row name.
+   * @return The table prefix of a region name.
+   */
+  public static byte [] getTableNameFromRegionName(final byte [] regionName) {
+    return parseMetaRegionRow(regionName).get(0);
+  }
+
+  /**
+   * Parses passed metaregion row into its constituent parts.
+   * Presumes region names are ASCII characters only.
+   * @param regionName A metaregion row name.
+   * @return A list where first element is the tablename, second the row
+   * portion, and the third the id.
+   */
+  public static List<byte []> parseMetaRegionRow(final byte [] regionName) {
+    int offset = -1;
+    for (int i = 0; i < regionName.length; i++) {
+      if (regionName[i] == DELIMITER) {
+        offset = i;
+        break;
+      }
+    }
+    if (offset == -1) {
+      throw new IllegalArgumentException(Bytes.toString(regionName) +
+        " does not contain '" + DELIMITER + "' character");
+    }
+    byte [] tableName = new byte[offset];
+    System.arraycopy(regionName, 0, tableName, 0, offset);
+    // Now move in from the tail till we hit DELIMITER to find the id
+    offset = -1;
+    for (int i = regionName.length - 1; i > tableName.length; i--) {
+      if (regionName[i] == DELIMITER) {
+        offset = i;
+        break;
+      }
+    }
+    if (offset == -1) {
+      throw new IllegalArgumentException(Bytes.toString(regionName) +
+          " does not have parseable tail");
+    }
+    byte [] row = new byte[offset - (tableName.length + 1)];
+    System.arraycopy(regionName, tableName.length + 1, row, 0,
+      offset - (tableName.length + 1));
+    byte [] id = new byte[regionName.length - (offset + 1)];
+    System.arraycopy(regionName, offset + 1, id, 0,
+      regionName.length - (offset + 1));
+    // Now make up an array to hold the three parse pieces.
+    List<byte []> result = new ArrayList<byte []>(3);
+    result.add(tableName);
+    result.add(row);
+    result.add(id);
+    return result;
+  }
+
+  private byte [] endKey = HConstants.EMPTY_BYTE_ARRAY;
+  private boolean offLine = false;
+  private long regionId = -1;
+  private byte [] regionName = HConstants.EMPTY_BYTE_ARRAY;
+  private String regionNameStr = "";
+  private boolean split = false;
+  private byte [] startKey = HConstants.EMPTY_BYTE_ARRAY;
+  protected HTableDescriptor tableDesc = null;
+  private int hashCode = -1;
+  public static final int NO_HASH = -1;
+  private volatile int encodedName = NO_HASH;
+  
+  private void setHashCode() {
+    int result = this.regionName.hashCode();
+    result ^= this.regionId;
+    result ^= this.startKey.hashCode();
+    result ^= this.endKey.hashCode();
+    result ^= Boolean.valueOf(this.offLine).hashCode();
+    result ^= this.tableDesc.hashCode();
+    this.hashCode = result;
+  }
+  
+  /**
+   * Private constructor used constructing HRegionInfo for the catalog root and
+   * first meta regions
+   */
+  private HRegionInfo(long regionId, HTableDescriptor tableDesc) {
+    this.regionId = regionId;
+    this.tableDesc = tableDesc;
+    this.regionName = createRegionName(tableDesc.getName(), null, regionId);
+    this.regionNameStr = Bytes.toString(this.regionName);
+    setHashCode();
+  }
+
+  /** Default constructor - creates empty object */
+  public HRegionInfo() {
+    this.tableDesc = new HTableDescriptor();
+  }
+  
+  /**
+   * Construct HRegionInfo with explicit parameters
+   * 
+   * @param tableDesc the table descriptor
+   * @param startKey first key in region
+   * @param endKey end of key range
+   * @throws IllegalArgumentException
+   */
+  public HRegionInfo(final HTableDescriptor tableDesc, final byte [] startKey,
+      final byte [] endKey)
+  throws IllegalArgumentException {
+    this(tableDesc, startKey, endKey, false);
+  }
+
+  /**
+   * Construct HRegionInfo with explicit parameters
+   * 
+   * @param tableDesc the table descriptor
+   * @param startKey first key in region
+   * @param endKey end of key range
+   * @param split true if this region has split and we have daughter regions
+   * regions that may or may not hold references to this region.
+   * @throws IllegalArgumentException
+   */
+  public HRegionInfo(HTableDescriptor tableDesc, final byte [] startKey,
+      final byte [] endKey, final boolean split)
+  throws IllegalArgumentException {
+    this(tableDesc, startKey, endKey, split, System.currentTimeMillis());
+  }
+
+  /**
+   * Construct HRegionInfo with explicit parameters
+   * 
+   * @param tableDesc the table descriptor
+   * @param startKey first key in region
+   * @param endKey end of key range
+   * @param split true if this region has split and we have daughter regions
+   * regions that may or may not hold references to this region.
+   * @param regionid Region id to use.
+   * @throws IllegalArgumentException
+   */
+  public HRegionInfo(HTableDescriptor tableDesc, final byte [] startKey,
+    final byte [] endKey, final boolean split, final long regionid)
+  throws IllegalArgumentException {
+    if (tableDesc == null) {
+      throw new IllegalArgumentException("tableDesc cannot be null");
+    }
+    this.offLine = false;
+    this.regionId = regionid;
+    this.regionName = createRegionName(tableDesc.getName(), startKey, regionId);
+    this.regionNameStr = Bytes.toString(this.regionName);
+    this.split = split;
+    this.endKey = endKey == null? HConstants.EMPTY_END_ROW: endKey.clone();
+    this.startKey = startKey == null?
+      HConstants.EMPTY_START_ROW: startKey.clone();
+    this.tableDesc = tableDesc;
+    setHashCode();
+  }
+  
+  /**
+   * Costruct a copy of another HRegionInfo
+   * 
+   * @param other
+   */
+  public HRegionInfo(HRegionInfo other) {
+    this.endKey = other.getEndKey();
+    this.offLine = other.isOffline();
+    this.regionId = other.getRegionId();
+    this.regionName = other.getRegionName();
+    this.regionNameStr = Bytes.toString(this.regionName);
+    this.split = other.isSplit();
+    this.startKey = other.getStartKey();
+    this.tableDesc = other.getTableDesc();
+    this.hashCode = other.hashCode();
+    this.encodedName = other.getEncodedName();
+  }
+  
+  private static byte [] createRegionName(final byte [] tableName,
+      final byte [] startKey, final long regionid) {
+    return createRegionName(tableName, startKey, Long.toString(regionid));
+  }
+
+  /**
+   * Make a region name of passed parameters.
+   * @param tableName
+   * @param startKey Can be null
+   * @param id Region id.
+   * @return Region name made of passed tableName, startKey and id
+   */
+  public static byte [] createRegionName(final byte [] tableName,
+      final byte [] startKey, final String id) {
+    return createRegionName(tableName, startKey, Bytes.toBytes(id));
+  }
+  /**
+   * Make a region name of passed parameters.
+   * @param tableName
+   * @param startKey Can be null
+   * @param id Region id
+   * @return Region name made of passed tableName, startKey and id
+   */
+  public static byte [] createRegionName(final byte [] tableName,
+      final byte [] startKey, final byte [] id) {
+    byte [] b = new byte [tableName.length + 2 + id.length +
+       (startKey == null? 0: startKey.length)];
+    int offset = tableName.length;
+    System.arraycopy(tableName, 0, b, 0, offset);
+    b[offset++] = DELIMITER;
+    if (startKey != null && startKey.length > 0) {
+      System.arraycopy(startKey, 0, b, offset, startKey.length);
+      offset += startKey.length;
+    }
+    b[offset++] = DELIMITER;
+    System.arraycopy(id, 0, b, offset, id.length);
+    return b;
+  }
+  
+  /** @return the endKey */
+  public byte [] getEndKey(){
+    return endKey;
+  }
+
+  /** @return the regionId */
+  public long getRegionId(){
+    return regionId;
+  }
+
+  /**
+   * @return the regionName as an array of bytes.
+   * @see #getRegionNameAsString()
+   */
+  public byte [] getRegionName(){
+    return regionName;
+  }
+
+  /**
+   * @return Region name as a String for use in logging, etc.
+   */
+  public String getRegionNameAsString() {
+    return this.regionNameStr;
+  }
+  
+  /** @return the encoded region name */
+  public synchronized int getEncodedName() {
+    if (this.encodedName == NO_HASH) {
+      this.encodedName = encodeRegionName(this.regionName);
+    }
+    return this.encodedName;
+  }
+
+  /** @return the startKey */
+  public byte [] getStartKey(){
+    return startKey;
+  }
+
+  /** @return the tableDesc */
+  public HTableDescriptor getTableDesc(){
+    return tableDesc;
+  }
+
+  /** @return true if this is the root region */
+  public boolean isRootRegion() {
+    return this.tableDesc.isRootRegion();
+  }
+  
+  /** @return true if this is the meta table */
+  public boolean isMetaTable() {
+    return this.tableDesc.isMetaTable();
+  }
+
+  /** @return true if this region is a meta region */
+  public boolean isMetaRegion() {
+    return this.tableDesc.isMetaRegion();
+  }
+  
+  /**
+   * @return True if has been split and has daughters.
+   */
+  public boolean isSplit() {
+    return this.split;
+  }
+  
+  /**
+   * @param split set split status
+   */
+  public void setSplit(boolean split) {
+    this.split = split;
+  }
+
+  /**
+   * @return True if this region is offline.
+   */
+  public boolean isOffline() {
+    return this.offLine;
+  }
+
+  /**
+   * @param offLine set online - offline status
+   */
+  public void setOffline(boolean offLine) {
+    this.offLine = offLine;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String toString() {
+    return "REGION => {" + HConstants.NAME + " => '" +
+      this.regionNameStr +
+      "', STARTKEY => '" +
+      Bytes.toString(this.startKey) + "', ENDKEY => '" +
+      Bytes.toString(this.endKey) + 
+      "', ENCODED => " + getEncodedName() + "," +
+      (isOffline()? " OFFLINE => true,": "") + (isSplit()? " SPLIT => true,": "") +
+      " TABLE => {" + this.tableDesc.toString() + "}";
+  }
+    
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean equals(Object o) {
+    return this.compareTo(o) == 0;
+  }
+  
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public int hashCode() {
+    return this.hashCode;
+  }
+
+  //
+  // Writable
+  //
+
+  /**
+   * {@inheritDoc}
+   */
+  public void write(DataOutput out) throws IOException {
+    Bytes.writeByteArray(out, endKey);
+    out.writeBoolean(offLine);
+    out.writeLong(regionId);
+    Bytes.writeByteArray(out, regionName);
+    out.writeBoolean(split);
+    Bytes.writeByteArray(out, startKey);
+    tableDesc.write(out);
+    out.writeInt(hashCode);
+  }
+  
+  /**
+   * {@inheritDoc}
+   */
+  public void readFields(DataInput in) throws IOException {
+    this.endKey = Bytes.readByteArray(in);
+    this.offLine = in.readBoolean();
+    this.regionId = in.readLong();
+    this.regionName = Bytes.readByteArray(in);
+    this.regionNameStr = Bytes.toString(this.regionName);
+    this.split = in.readBoolean();
+    this.startKey = Bytes.readByteArray(in);
+    this.tableDesc.readFields(in);
+    this.hashCode = in.readInt();
+  }
+  
+  //
+  // Comparable
+  //
+  
+  /**
+   * {@inheritDoc}
+   */
+  public int compareTo(Object o) {
+    HRegionInfo other = (HRegionInfo) o;
+    if (other == null) {
+      return 1;
+    }
+    
+    // Are regions of same table?
+    int result = this.tableDesc.compareTo(other.tableDesc);
+    if (result != 0) {
+      return result;
+    }
+
+    // Compare start keys.
+    result = Bytes.compareTo(this.startKey, other.startKey);
+    if (result != 0) {
+      return result;
+    }
+    
+    // Compare end keys.
+    return Bytes.compareTo(this.endKey, other.endKey);
+  }
+}

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/HStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/HStore.java?rev=677517&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/HStore.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/HStore.java Thu Jul 17 00:17:26 2008
@@ -0,0 +1,1816 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util.migration.v5;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.ChangedReadersObserver;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * HStore maintains a bunch of data files.  It is responsible for maintaining 
+ * the memory/file hierarchy and for periodic flushes to disk and compacting 
+ * edits to the file.
+ *
+ * Locking and transactions are handled at a higher level.  This API should not 
+ * be called directly by any writer, but rather by an HRegion manager.
+ */
+public class HStore implements HConstants {
+  static final Log LOG = LogFactory.getLog(HStore.class);
+
+  /*
+   * Regex that will work for straight filenames and for reference names.
+   * If reference, then the regex has more than just one group.  Group 1 is
+   * this files id.  Group 2 the referenced region name, etc.
+   */
+  private static final Pattern REF_NAME_PARSER =
+    Pattern.compile("^(\\d+)(?:\\.(.+))?$");
+  
+  protected final Memcache memcache;
+  private final Path basedir;
+  private final HRegionInfo info;
+  private final HColumnDescriptor family;
+  private final SequenceFile.CompressionType compression;
+  final FileSystem fs;
+  private final HBaseConfiguration conf;
+  protected long ttl;
+
+  private final long desiredMaxFileSize;
+  private volatile long storeSize;
+
+  private final Integer flushLock = new Integer(0);
+
+  final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+  final byte [] storeName;
+  private final String storeNameStr;
+
+  /*
+   * Sorted Map of readers keyed by sequence id (Most recent should be last in
+   * in list).
+   */
+  private final SortedMap<Long, HStoreFile> storefiles =
+    Collections.synchronizedSortedMap(new TreeMap<Long, HStoreFile>());
+  
+  /*
+   * Sorted Map of readers keyed by sequence id (Most recent should be last in
+   * in list).
+   */
+  private final SortedMap<Long, MapFile.Reader> readers =
+    new TreeMap<Long, MapFile.Reader>();
+
+  // The most-recent log-seq-ID that's present.  The most-recent such ID means
+  // we can ignore all log messages up to and including that ID (because they're
+  // already reflected in the TreeMaps).
+  private volatile long maxSeqId;
+  
+  private final Path compactionDir;
+  private final Integer compactLock = new Integer(0);
+  private final int compactionThreshold;
+  private final Set<ChangedReadersObserver> changedReaderObservers =
+    Collections.synchronizedSet(new HashSet<ChangedReadersObserver>());
+
+  /**
+   * An HStore is a set of zero or more MapFiles, which stretch backwards over 
+   * time.  A given HStore is responsible for a certain set of columns for a
+   * row in the HRegion.
+   *
+   * <p>The HRegion starts writing to its set of HStores when the HRegion's 
+   * memcache is flushed.  This results in a round of new MapFiles, one for
+   * each HStore.
+   *
+   * <p>There's no reason to consider append-logging at this level; all logging 
+   * and locking is handled at the HRegion level.  HStore just provides
+   * services to manage sets of MapFiles.  One of the most important of those
+   * services is MapFile-compaction services.
+   *
+   * <p>The only thing having to do with logs that HStore needs to deal with is
+   * the reconstructionLog.  This is a segment of an HRegion's log that might
+   * NOT be present upon startup.  If the param is NULL, there's nothing to do.
+   * If the param is non-NULL, we need to process the log to reconstruct
+   * a TreeMap that might not have been written to disk before the process
+   * died.
+   *
+   * <p>It's assumed that after this constructor returns, the reconstructionLog
+   * file will be deleted (by whoever has instantiated the HStore).
+   *
+   * @param basedir qualified path under which the region directory lives
+   * @param info HRegionInfo for this region
+   * @param family HColumnDescriptor for this column
+   * @param fs file system object
+   * @param reconstructionLog existing log file to apply if any
+   * @param conf configuration object
+   * @param reporter Call on a period so hosting server can report we're
+   * making progress to master -- otherwise master might think region deploy
+   * failed.  Can be null.
+   * @throws IOException
+   */
+  protected HStore(Path basedir, HRegionInfo info, HColumnDescriptor family,
+      FileSystem fs, Path reconstructionLog, HBaseConfiguration conf,
+      final Progressable reporter)
+  throws IOException {  
+    this.basedir = basedir;
+    this.info = info;
+    this.family = family;
+    this.fs = fs;
+    this.conf = conf;
+    this.ttl = family.getTimeToLive();
+    if (ttl != HConstants.FOREVER)
+      this.ttl *= 1000;
+    this.memcache = new Memcache(this.ttl);
+    this.compactionDir = HRegion.getCompactionDir(basedir);
+    this.storeName = Bytes.toBytes(this.info.getEncodedName() + "/" +
+      Bytes.toString(this.family.getName()));
+    this.storeNameStr = Bytes.toString(this.storeName);
+    
+    // By default, we compact if an HStore has more than
+    // MIN_COMMITS_FOR_COMPACTION map files
+    this.compactionThreshold =
+      conf.getInt("hbase.hstore.compactionThreshold", 3);
+    
+    // By default we split region if a file > DEFAULT_MAX_FILE_SIZE.
+    this.desiredMaxFileSize =
+      conf.getLong("hbase.hregion.max.filesize", DEFAULT_MAX_FILE_SIZE);
+    this.storeSize = 0L;
+
+    if (family.getCompression() == HColumnDescriptor.CompressionType.BLOCK) {
+      this.compression = SequenceFile.CompressionType.BLOCK;
+    } else if (family.getCompression() ==
+      HColumnDescriptor.CompressionType.RECORD) {
+      this.compression = SequenceFile.CompressionType.RECORD;
+    } else {
+      this.compression = SequenceFile.CompressionType.NONE;
+    }
+    
+    Path mapdir = HStoreFile.getMapDir(basedir, info.getEncodedName(),
+        family.getName());
+    if (!fs.exists(mapdir)) {
+      fs.mkdirs(mapdir);
+    }
+    Path infodir = HStoreFile.getInfoDir(basedir, info.getEncodedName(),
+        family.getName());
+    if (!fs.exists(infodir)) {
+      fs.mkdirs(infodir);
+    }
+    
+    // Go through the 'mapdir' and 'infodir' together, make sure that all 
+    // MapFiles are in a reliable state.  Every entry in 'mapdir' must have a 
+    // corresponding one in 'loginfodir'. Without a corresponding log info
+    // file, the entry in 'mapdir' must be deleted.
+    // loadHStoreFiles also computes the max sequence id internally.
+    this.maxSeqId = -1L;
+    this.storefiles.putAll(loadHStoreFiles(infodir, mapdir));
+    if (LOG.isDebugEnabled() && this.storefiles.size() > 0) {
+      LOG.debug("Loaded " + this.storefiles.size() + " file(s) in hstore " +
+        Bytes.toString(this.storeName) + ", max sequence id " + this.maxSeqId);
+    }
+    
+    try {
+      doReconstructionLog(reconstructionLog, maxSeqId, reporter);
+    } catch (EOFException e) {
+      // Presume we got here because of lack of HADOOP-1700; for now keep going
+      // but this is probably not what we want long term.  If we got here there
+      // has been data-loss
+      LOG.warn("Exception processing reconstruction log " + reconstructionLog +
+        " opening " + this.storeName +
+        " -- continuing.  Probably lack-of-HADOOP-1700 causing DATA LOSS!", e);
+    } catch (IOException e) {
+      // Presume we got here because of some HDFS issue. Don't just keep going.
+      // Fail to open the HStore.  Probably means we'll fail over and over
+      // again until human intervention but alternative has us skipping logs
+      // and losing edits: HBASE-642.
+      LOG.warn("Exception processing reconstruction log " + reconstructionLog +
+        " opening " + this.storeName, e);
+      throw e;
+    }
+
+    // Finally, start up all the map readers! (There could be more than one
+    // since we haven't compacted yet.)
+    boolean first = true;
+    for(Map.Entry<Long, HStoreFile> e: this.storefiles.entrySet()) {
+      MapFile.Reader r = null;
+      if (first) {
+        // Use a block cache (if configured) for the first reader only
+        // so as to control memory usage.
+        r = e.getValue().getReader(this.fs, this.family.isBloomFilterEnabled(),
+          family.isBlockCacheEnabled());
+        first = false;
+      } else {
+        r = e.getValue().getReader(this.fs, this.family.isBloomFilterEnabled(),
+            false);
+      }
+      this.readers.put(e.getKey(), r);
+    }
+  }
+
+  HColumnDescriptor getFamily() {
+    return this.family;
+  }
+  
+  long getMaxSequenceId() {
+    return this.maxSeqId;
+  }
+  
+  /*
+   * Read the reconstructionLog to see whether we need to build a brand-new 
+   * MapFile out of non-flushed log entries.  
+   *
+   * We can ignore any log message that has a sequence ID that's equal to or 
+   * lower than maxSeqID.  (Because we know such log messages are already 
+   * reflected in the MapFiles.)
+   */
+  private void doReconstructionLog(final Path reconstructionLog,
+    final long maxSeqID, final Progressable reporter)
+  throws UnsupportedEncodingException, IOException {
+    if (reconstructionLog == null || !fs.exists(reconstructionLog)) {
+      // Nothing to do.
+      return;
+    }
+    // Check its not empty.
+    FileStatus[] stats = fs.listStatus(reconstructionLog);
+    if (stats == null || stats.length == 0) {
+      LOG.warn("Passed reconstruction log " + reconstructionLog + " is zero-length");
+      return;
+    }
+    long maxSeqIdInLog = -1;
+    TreeMap<HStoreKey, byte []> reconstructedCache =
+      new TreeMap<HStoreKey, byte []>();
+      
+    SequenceFile.Reader logReader = new SequenceFile.Reader(this.fs,
+        reconstructionLog, this.conf);
+    
+    try {
+      HLogKey key = new HLogKey();
+      HLogEdit val = new HLogEdit();
+      long skippedEdits = 0;
+      long editsCount = 0;
+      // How many edits to apply before we send a progress report.
+      int reportInterval = this.conf.getInt("hbase.hstore.report.interval.edits", 2000);
+      while (logReader.next(key, val)) {
+        maxSeqIdInLog = Math.max(maxSeqIdInLog, key.getLogSeqNum());
+        if (key.getLogSeqNum() <= maxSeqID) {
+          skippedEdits++;
+          continue;
+        }
+        // Check this edit is for me. Also, guard against writing
+        // METACOLUMN info such as HBASE::CACHEFLUSH entries
+        byte [] column = val.getColumn();
+        if (Bytes.equals(column, HLog.METACOLUMN)
+            || !Bytes.equals(key.getRegionName(), info.getRegionName())
+            || !HStoreKey.matchingFamily(family.getName(), column)) {
+          continue;
+        }
+        HStoreKey k = new HStoreKey(key.getRow(), column, val.getTimestamp());
+        reconstructedCache.put(k, val.getVal());
+        editsCount++;
+        // Every 2k edits, tell the reporter we're making progress.
+        // Have seen 60k edits taking 3minutes to complete.
+        if (reporter != null && (editsCount % reportInterval) == 0) {
+          reporter.progress();
+        }
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Applied " + editsCount + ", skipped " + skippedEdits +
+          " because sequence id <= " + maxSeqID);
+      }
+    } finally {
+      logReader.close();
+    }
+    
+    if (reconstructedCache.size() > 0) {
+      // We create a "virtual flush" at maxSeqIdInLog+1.
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("flushing reconstructionCache");
+      }
+      internalFlushCache(reconstructedCache, maxSeqIdInLog + 1);
+    }
+  }
+  
+  /*
+   * Creates a series of HStoreFiles loaded from the given directory.
+   * There must be a matching 'mapdir' and 'loginfo' pair of files.
+   * If only one exists, we'll delete it.  Does other consistency tests
+   * checking files are not zero, etc.
+   *
+   * @param infodir qualified path for info file directory
+   * @param mapdir qualified path for map file directory
+   * @throws IOException
+   */
+  private SortedMap<Long, HStoreFile> loadHStoreFiles(Path infodir, Path mapdir)
+  throws IOException {
+    // Look first at info files.  If a reference, these contain info we need
+    // to create the HStoreFile.
+    FileStatus infofiles[] = fs.listStatus(infodir);
+    SortedMap<Long, HStoreFile> results = new TreeMap<Long, HStoreFile>();
+    ArrayList<Path> mapfiles = new ArrayList<Path>(infofiles.length);
+    for (int i = 0; i < infofiles.length; i++) {
+      Path p = infofiles[i].getPath();
+      // Check for empty info file.  Should never be the case but can happen
+      // after data loss in hdfs for whatever reason (upgrade, etc.): HBASE-646
+      if (this.fs.getFileStatus(p).getLen() <= 0) {
+        LOG.warn("Skipping " + p + " because its empty.  DATA LOSS?  Can " +
+          "this scenario be repaired?  HBASE-646");
+        continue;
+      }
+
+      Matcher m = REF_NAME_PARSER.matcher(p.getName());
+      /*
+       *  *  *  *  *  N O T E  *  *  *  *  *
+       *  
+       *  We call isReference(Path, Matcher) here because it calls
+       *  Matcher.matches() which must be called before Matcher.group(int)
+       *  and we don't want to call Matcher.matches() twice.
+       *  
+       *  *  *  *  *  N O T E  *  *  *  *  *
+       */
+      boolean isReference = isReference(p, m);
+      long fid = Long.parseLong(m.group(1));
+      
+      HStoreFile curfile = null;
+      HStoreFile.Reference reference = null;
+      if (isReference) {
+        reference = HStoreFile.readSplitInfo(p, fs);
+      }
+      curfile = new HStoreFile(conf, fs, basedir, info.getEncodedName(),
+        family.getName(), fid, reference);
+      storeSize += curfile.length();
+      long storeSeqId = -1;
+      try {
+        storeSeqId = curfile.loadInfo(fs);
+        if (storeSeqId > this.maxSeqId) {
+          this.maxSeqId = storeSeqId;
+        }
+      } catch (IOException e) {
+        // If the HSTORE_LOGINFOFILE doesn't contain a number, just ignore it.
+        // That means it was built prior to the previous run of HStore, and so
+        // it cannot contain any updates also contained in the log.
+        LOG.info("HSTORE_LOGINFOFILE " + curfile +
+          " does not contain a sequence number - ignoring");
+      }
+      Path mapfile = curfile.getMapFilePath();
+      if (!fs.exists(mapfile)) {
+        fs.delete(curfile.getInfoFilePath(), false);
+        LOG.warn("Mapfile " + mapfile.toString() + " does not exist. " +
+          "Cleaned up info file.  Continuing...Probable DATA LOSS!!!");
+        continue;
+      }
+      if (isEmptyDataFile(mapfile)) {
+        curfile.delete();
+        // We can have empty data file if data loss in hdfs.
+        LOG.warn("Mapfile " + mapfile.toString() + " has empty data. " +
+          "Deleting.  Continuing...Probable DATA LOSS!!!  See HBASE-646.");
+        continue;
+      }
+      if (isEmptyIndexFile(mapfile)) {
+        try {
+          // Try fixing this file.. if we can.  Use the hbase version of fix.
+          // Need to remove the old index file first else fix won't go ahead.
+          this.fs.delete(new Path(mapfile, MapFile.INDEX_FILE_NAME), false);
+          long count = MapFile.fix(this.fs, mapfile, HStoreFile.HbaseMapFile.KEY_CLASS,
+            HStoreFile.HbaseMapFile.VALUE_CLASS, false, this.conf);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Fixed index on " + mapfile.toString() + "; had " +
+              count + " entries");
+          }
+        } catch (Exception e) {
+          LOG.warn("Failed fix of " + mapfile.toString() +
+            "...continuing; Probable DATA LOSS!!!", e);
+          continue;
+        }
+      }
+      
+      // TODO: Confirm referent exists.
+      
+      // Found map and sympathetic info file.  Add this hstorefile to result.
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("loaded " + FSUtils.getPath(p) + ", isReference=" +
+          isReference + ", sequence id=" + storeSeqId);
+      }
+      results.put(Long.valueOf(storeSeqId), curfile);
+      // Keep list of sympathetic data mapfiles for cleaning info dir in next
+      // section.  Make sure path is fully qualified for compare.
+      mapfiles.add(mapfile);
+    }
+    
+    // List paths by experience returns fully qualified names -- at least when
+    // running on a mini hdfs cluster.
+    FileStatus datfiles[] = fs.listStatus(mapdir);
+    for (int i = 0; i < datfiles.length; i++) {
+      Path p = datfiles[i].getPath();
+      // If does not have sympathetic info file, delete.
+      if (!mapfiles.contains(fs.makeQualified(p))) {
+        fs.delete(p, true);
+      }
+    }
+    return results;
+  }
+
+  /* 
+   * @param mapfile
+   * @return True if the passed mapfile has a zero-length data component (its
+   * broken).
+   * @throws IOException
+   */
+  private boolean isEmptyDataFile(final Path mapfile)
+  throws IOException {
+    // Mapfiles are made of 'data' and 'index' files.  Confirm 'data' is
+    // non-null if it exists (may not have been written to yet).
+    return isEmptyFile(new Path(mapfile, MapFile.DATA_FILE_NAME));
+  }
+
+  /* 
+   * @param mapfile
+   * @return True if the passed mapfile has a zero-length index component (its
+   * broken).
+   * @throws IOException
+   */
+  private boolean isEmptyIndexFile(final Path mapfile)
+  throws IOException {
+    // Mapfiles are made of 'data' and 'index' files.  Confirm 'data' is
+    // non-null if it exists (may not have been written to yet).
+    return isEmptyFile(new Path(mapfile, MapFile.INDEX_FILE_NAME));
+  }
+
+  /* 
+   * @param mapfile
+   * @return True if the passed mapfile has a zero-length index component (its
+   * broken).
+   * @throws IOException
+   */
+  private boolean isEmptyFile(final Path f)
+  throws IOException {
+    return this.fs.exists(f) &&
+      this.fs.getFileStatus(f).getLen() == 0;
+  }
+
+  /**
+   * Adds a value to the memcache
+   * 
+   * @param key
+   * @param value
+   * @return memcache size delta
+   */
+  protected long add(HStoreKey key, byte[] value) {
+    lock.readLock().lock();
+    try {
+      return this.memcache.add(key, value);
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+  
+  /**
+   * Close all the MapFile readers
+   * 
+   * We don't need to worry about subsequent requests because the HRegion holds
+   * a write lock that will prevent any more reads or writes.
+   * 
+   * @throws IOException
+   */
+  List<HStoreFile> close() throws IOException {
+    ArrayList<HStoreFile> result = null;
+    this.lock.writeLock().lock();
+    try {
+      for (MapFile.Reader reader: this.readers.values()) {
+        reader.close();
+      }
+      synchronized (this.storefiles) {
+        result = new ArrayList<HStoreFile>(storefiles.values());
+      }
+      LOG.debug("closed " + this.storeNameStr);
+      return result;
+    } finally {
+      this.lock.writeLock().unlock();
+    }
+  }
+  
+  //////////////////////////////////////////////////////////////////////////////
+  // Flush changes to disk
+  //////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Snapshot this stores memcache.  Call before running
+   * {@link #flushCache(long)} so it has some work to do.
+   */
+  void snapshot() {
+    this.memcache.snapshot();
+  }
+  
+  /**
+   * Write out current snapshot.  Presumes {@link #snapshot()} has been called
+   * previously.
+   * @param logCacheFlushId flush sequence number
+   * @return true if a compaction is needed
+   * @throws IOException
+   */
+  boolean flushCache(final long logCacheFlushId) throws IOException {
+    // Get the snapshot to flush.  Presumes that a call to
+    // this.memcache.snapshot() has happened earlier up in the chain.
+    SortedMap<HStoreKey, byte []> cache = this.memcache.getSnapshot();
+    boolean compactionNeeded = internalFlushCache(cache, logCacheFlushId);
+    // If an exception happens flushing, we let it out without clearing
+    // the memcache snapshot.  The old snapshot will be returned when we say
+    // 'snapshot', the next time flush comes around.
+    this.memcache.clearSnapshot(cache);
+    return compactionNeeded;
+  }
+  
+  private boolean internalFlushCache(SortedMap<HStoreKey, byte []> cache,
+      long logCacheFlushId) throws IOException {
+    long flushed = 0;
+    // Don't flush if there are no entries.
+    if (cache.size() == 0) {
+      return false;
+    }
+    
+    // TODO:  We can fail in the below block before we complete adding this
+    // flush to list of store files.  Add cleanup of anything put on filesystem
+    // if we fail.
+    synchronized(flushLock) {
+      long now = System.currentTimeMillis();
+      // A. Write the Maps out to the disk
+      HStoreFile flushedFile = new HStoreFile(conf, fs, basedir,
+        info.getEncodedName(),  family.getName(), -1L, null);
+      MapFile.Writer out = flushedFile.getWriter(this.fs, this.compression,
+        this.family.isBloomFilterEnabled(), cache.size());
+      
+      // Here we tried picking up an existing HStoreFile from disk and
+      // interlacing the memcache flush compacting as we go.  The notion was
+      // that interlacing would take as long as a pure flush with the added
+      // benefit of having one less file in the store.  Experiments showed that
+      // it takes two to three times the amount of time flushing -- more column
+      // families makes it so the two timings come closer together -- but it
+      // also complicates the flush. The code was removed.  Needed work picking
+      // which file to interlace (favor references first, etc.)
+      //
+      // Related, looks like 'merging compactions' in BigTable paper interlaces
+      // a memcache flush.  We don't.
+      int entries = 0;
+      try {
+        for (Map.Entry<HStoreKey, byte []> es: cache.entrySet()) {
+          HStoreKey curkey = es.getKey();
+          byte[] bytes = es.getValue();
+          if (HStoreKey.matchingFamily(this.family.getName(), curkey.getColumn())) {
+            if (ttl == HConstants.FOREVER ||
+                  now < curkey.getTimestamp() + ttl) {
+              entries++;
+              out.append(curkey, new ImmutableBytesWritable(bytes));
+              flushed += curkey.getSize() + (bytes == null ? 0 : bytes.length);
+            } else {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("internalFlushCache: " + curkey +
+                  ": expired, skipped");
+              }
+            }
+          }
+        }
+      } finally {
+        out.close();
+      }
+      long newStoreSize = flushedFile.length();
+      storeSize += newStoreSize;
+
+      // B. Write out the log sequence number that corresponds to this output
+      // MapFile.  The MapFile is current up to and including the log seq num.
+      flushedFile.writeInfo(fs, logCacheFlushId);
+      
+      // C. Finally, make the new MapFile available.
+      updateReaders(logCacheFlushId, flushedFile);
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Added " + FSUtils.getPath(flushedFile.getMapFilePath()) +
+          " with " + entries +
+          " entries, sequence id " + logCacheFlushId + ", data size " +
+          StringUtils.humanReadableInt(flushed) + ", file size " +
+          StringUtils.humanReadableInt(newStoreSize));
+      }
+    }
+    return storefiles.size() >= compactionThreshold;
+  }
+  
+  /*
+   * Change readers adding into place the Reader produced by this new flush.
+   * @param logCacheFlushId
+   * @param flushedFile
+   * @throws IOException
+   */
+  private void updateReaders(final long logCacheFlushId,
+      final HStoreFile flushedFile)
+  throws IOException {
+    this.lock.writeLock().lock();
+    try {
+      Long flushid = Long.valueOf(logCacheFlushId);
+      // Open the map file reader.
+      this.readers.put(flushid,
+        flushedFile.getReader(this.fs, this.family.isBloomFilterEnabled(),
+        this.family.isBlockCacheEnabled()));
+      this.storefiles.put(flushid, flushedFile);
+      // Tell listeners of the change in readers.
+      notifyChangedReadersObservers();
+    } finally {
+      this.lock.writeLock().unlock();
+    }
+  }
+  
+  /*
+   * Notify all observers that set of Readers has changed.
+   * @throws IOException
+   */
+  private void notifyChangedReadersObservers() throws IOException {
+    synchronized (this.changedReaderObservers) {
+      for (ChangedReadersObserver o: this.changedReaderObservers) {
+        o.updateReaders();
+      }
+    }
+  }
+  
+  /*
+   * @param o Observer who wants to know about changes in set of Readers
+   */
+  void addChangedReaderObserver(ChangedReadersObserver o) {
+    this.changedReaderObservers.add(o);
+  }
+  
+  /*
+   * @param o Observer no longer interested in changes in set of Readers.
+   */
+  void deleteChangedReaderObserver(ChangedReadersObserver o) {
+    if (!this.changedReaderObservers.remove(o)) {
+      LOG.warn("Not in set" + o);
+    }
+  }
+
+  //////////////////////////////////////////////////////////////////////////////
+  // Compaction
+  //////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Compact the back-HStores.  This method may take some time, so the calling 
+   * thread must be able to block for long periods.
+   * 
+   * <p>During this time, the HStore can work as usual, getting values from
+   * MapFiles and writing new MapFiles from the Memcache.
+   * 
+   * Existing MapFiles are not destroyed until the new compacted TreeMap is 
+   * completely written-out to disk.
+   *
+   * The compactLock prevents multiple simultaneous compactions.
+   * The structureLock prevents us from interfering with other write operations.
+   * 
+   * We don't want to hold the structureLock for the whole time, as a compact() 
+   * can be lengthy and we want to allow cache-flushes during this period.
+   * 
+   * @param force True to force a compaction regardless of thresholds (Needed
+   * by merge).
+   * @return mid key if a split is needed, null otherwise
+   * @throws IOException
+   */
+  StoreSize compact(final boolean force) throws IOException {
+    synchronized (compactLock) {
+      long maxId = -1;
+      int nrows = -1;
+      List<HStoreFile> filesToCompact = null;
+      synchronized (storefiles) {
+        if (this.storefiles.size() <= 0) {
+          return null;
+        }
+        filesToCompact = new ArrayList<HStoreFile>(this.storefiles.values());
+
+        // The max-sequenceID in any of the to-be-compacted TreeMaps is the 
+        // last key of storefiles.
+        maxId = this.storefiles.lastKey().longValue();
+      }
+      if (!force && filesToCompact.size() < compactionThreshold) {
+        return checkSplit();
+      }
+      if (!fs.exists(compactionDir) && !fs.mkdirs(compactionDir)) {
+        LOG.warn("Mkdir on " + compactionDir.toString() + " failed");
+        return checkSplit();
+      }
+      /*
+       * We create a new list of MapFile.Reader objects so we don't screw up the
+       * caching associated with the currently-loaded ones. Our iteration-based
+       * access pattern is practically designed to ruin the cache.
+       */
+      List<MapFile.Reader> readers = new ArrayList<MapFile.Reader>();
+      for (HStoreFile file: filesToCompact) {
+        try {
+          HStoreFile.BloomFilterMapFile.Reader reader =
+            file.getReader(fs, false, false);
+          readers.add(reader);
+          
+          // Compute the size of the new bloomfilter if needed
+          if (this.family.isBloomFilterEnabled()) {
+            nrows += reader.getBloomFilterSize();
+          }
+        } catch (IOException e) {
+          // Add info about which file threw exception. It may not be in the
+          // exception message so output a message here where we know the
+          // culprit.
+          LOG.warn("Failed with " + e.toString() + ": " + file.toString());
+          closeCompactionReaders(readers);
+          throw e;
+        }
+      }
+      
+      // Storefiles are keyed by sequence id. The oldest file comes first.
+      // We need to return out of here a List that has the newest file first.
+      Collections.reverse(readers);
+
+      // Step through them, writing to the brand-new MapFile
+      HStoreFile compactedOutputFile = new HStoreFile(conf, fs, 
+          this.compactionDir, info.getEncodedName(), family.getName(),
+          -1L, null);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("started compaction of " + readers.size() + " files into " +
+          FSUtils.getPath(compactedOutputFile.getMapFilePath()));
+      }
+      MapFile.Writer writer = compactedOutputFile.getWriter(this.fs,
+        this.compression, this.family.isBloomFilterEnabled(), nrows);
+      try {
+        compactHStoreFiles(writer, readers);
+      } finally {
+        writer.close();
+      }
+
+      // Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap.
+      compactedOutputFile.writeInfo(fs, maxId);
+
+      // Move the compaction into place.
+      completeCompaction(filesToCompact, compactedOutputFile);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Completed compaction of " + this.storeNameStr +
+            " store size is " + StringUtils.humanReadableInt(storeSize));
+      }
+    }
+    return checkSplit();
+  }
+  
+  /*
+   * Compact a list of MapFile.Readers into MapFile.Writer.
+   * 
+   * We work by iterating through the readers in parallel. We always increment
+   * the lowest-ranked one.
+   * Updates to a single row/column will appear ranked by timestamp. This allows
+   * us to throw out deleted values or obsolete versions.
+   */
+  private void compactHStoreFiles(final MapFile.Writer compactedOut,
+      final List<MapFile.Reader> readers) throws IOException {
+    
+    MapFile.Reader[] rdrs = readers.toArray(new MapFile.Reader[readers.size()]);
+    try {
+      HStoreKey[] keys = new HStoreKey[rdrs.length];
+      ImmutableBytesWritable[] vals = new ImmutableBytesWritable[rdrs.length];
+      boolean[] done = new boolean[rdrs.length];
+      for(int i = 0; i < rdrs.length; i++) {
+        keys[i] = new HStoreKey();
+        vals[i] = new ImmutableBytesWritable();
+        done[i] = false;
+      }
+
+      // Now, advance through the readers in order.  This will have the
+      // effect of a run-time sort of the entire dataset.
+      int numDone = 0;
+      for(int i = 0; i < rdrs.length; i++) {
+        rdrs[i].reset();
+        done[i] = ! rdrs[i].next(keys[i], vals[i]);
+        if(done[i]) {
+          numDone++;
+        }
+      }
+
+      long now = System.currentTimeMillis();
+      int timesSeen = 0;
+      byte [] lastRow = null;
+      byte [] lastColumn = null;
+      // Map of a row deletes keyed by column with a list of timestamps for value
+      Map<byte [], List<Long>> deletes = null;
+      while (numDone < done.length) {
+        // Find the reader with the smallest key.  If two files have same key
+        // but different values -- i.e. one is delete and other is non-delete
+        // value -- we will find the first, the one that was written later and
+        // therefore the one whose value should make it out to the compacted
+        // store file.
+        int smallestKey = -1;
+        for(int i = 0; i < rdrs.length; i++) {
+          if(done[i]) {
+            continue;
+          }
+          if(smallestKey < 0) {
+            smallestKey = i;
+          } else {
+            if(keys[i].compareTo(keys[smallestKey]) < 0) {
+              smallestKey = i;
+            }
+          }
+        }
+
+        // Reflect the current key/val in the output
+        HStoreKey sk = keys[smallestKey];
+        if (Bytes.equals(lastRow, sk.getRow())
+            && Bytes.equals(lastColumn, sk.getColumn())) {
+          timesSeen++;
+        } else {
+          timesSeen = 0;
+          // We are on to a new row.  Create a new deletes list.
+          deletes = new TreeMap<byte [], List<Long>>(Bytes.BYTES_COMPARATOR);
+        }
+
+        byte [] value = (vals[smallestKey] == null)?
+          null: vals[smallestKey].get();
+        if (!isDeleted(sk, value, false, deletes) &&
+            timesSeen <= family.getMaxVersions()) {
+          // Keep old versions until we have maxVersions worth.
+          // Then just skip them.
+          if (sk.getRow().length != 0 && sk.getColumn().length != 0) {
+            // Only write out objects which have a non-zero length key and
+            // value
+            if (ttl == HConstants.FOREVER || now < sk.getTimestamp() + ttl) {
+              compactedOut.append(sk, vals[smallestKey]);
+            } else {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("compactHStoreFiles: " + sk + ": expired, deleted");
+              }
+            }
+          }
+        }
+
+        // Update last-seen items
+        lastRow = sk.getRow();
+        lastColumn = sk.getColumn();
+
+        // Advance the smallest key.  If that reader's all finished, then 
+        // mark it as done.
+        if(!rdrs[smallestKey].next(keys[smallestKey],
+            vals[smallestKey])) {
+          done[smallestKey] = true;
+          rdrs[smallestKey].close();
+          rdrs[smallestKey] = null;
+          numDone++;
+        }
+      }
+    } finally {
+      closeCompactionReaders(readers);
+    }
+  }
+  
+  private void closeCompactionReaders(final List<MapFile.Reader> rdrs) {
+    for (MapFile.Reader r: rdrs) {
+      try {
+        r.close();
+      } catch (IOException e) {
+        LOG.warn("Exception closing reader for " + this.storeNameStr, e);
+      }
+    }
+  }
+
+  /*
+   * Check if this is cell is deleted.
+   * If a memcache and a deletes, check key does not have an entry filled.
+   * Otherwise, check value is not the <code>HGlobals.deleteBytes</code> value.
+   * If passed value IS deleteBytes, then it is added to the passed
+   * deletes map.
+   * @param hsk
+   * @param value
+   * @param checkMemcache true if the memcache should be consulted
+   * @param deletes Map keyed by column with a value of timestamp. Can be null.
+   * If non-null and passed value is HGlobals.deleteBytes, then we add to this
+   * map.
+   * @return True if this is a deleted cell.  Adds the passed deletes map if
+   * passed value is HGlobals.deleteBytes.
+  */
+  private boolean isDeleted(final HStoreKey hsk, final byte [] value,
+      final boolean checkMemcache, final Map<byte [], List<Long>> deletes) {
+    if (checkMemcache && memcache.isDeleted(hsk)) {
+      return true;
+    }
+    List<Long> timestamps =
+      (deletes == null) ? null: deletes.get(hsk.getColumn());
+    if (timestamps != null &&
+        timestamps.contains(Long.valueOf(hsk.getTimestamp()))) {
+      return true;
+    }
+    if (value == null) {
+      // If a null value, shouldn't be in here.  Mark it as deleted cell.
+      return true;
+    }
+    if (!HLogEdit.isDeleted(value)) {
+      return false;
+    }
+    // Cell has delete value.  Save it into deletes.
+    if (deletes != null) {
+      if (timestamps == null) {
+        timestamps = new ArrayList<Long>();
+        deletes.put(hsk.getColumn(), timestamps);
+      }
+      // We know its not already in the deletes array else we'd have returned
+      // earlier so no need to test if timestamps already has this value.
+      timestamps.add(Long.valueOf(hsk.getTimestamp()));
+    }
+    return true;
+  }
+  
+  /*
+   * It's assumed that the compactLock  will be acquired prior to calling this 
+   * method!  Otherwise, it is not thread-safe!
+   *
+   * It works by processing a compaction that's been written to disk.
+   * 
+   * <p>It is usually invoked at the end of a compaction, but might also be
+   * invoked at HStore startup, if the prior execution died midway through.
+   * 
+   * <p>Moving the compacted TreeMap into place means:
+   * <pre>
+   * 1) Moving the new compacted MapFile into place
+   * 2) Unload all replaced MapFiles, close and collect list to delete.
+   * 3) Loading the new TreeMap.
+   * 4) Compute new store size
+   * </pre>
+   * 
+   * @param compactedFiles list of files that were compacted
+   * @param compactedFile HStoreFile that is the result of the compaction
+   * @throws IOException
+   */
+  private void completeCompaction(final List<HStoreFile> compactedFiles,
+    final HStoreFile compactedFile)
+  throws IOException {
+    this.lock.writeLock().lock();
+    try {
+      // 1. Moving the new MapFile into place.
+      HStoreFile finalCompactedFile = new HStoreFile(conf, fs, basedir,
+        info.getEncodedName(), family.getName(), -1, null);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("moving " + FSUtils.getPath(compactedFile.getMapFilePath()) +
+          " to " + FSUtils.getPath(finalCompactedFile.getMapFilePath()));
+      }
+      if (!compactedFile.rename(this.fs, finalCompactedFile)) {
+        LOG.error("Failed move of compacted file " +
+          finalCompactedFile.getMapFilePath().toString());
+        return;
+      }
+
+      // 2. Unload all replaced MapFiles, close and collect list to delete.
+      synchronized (storefiles) {
+        Map<Long, HStoreFile> toDelete = new HashMap<Long, HStoreFile>();
+        for (Map.Entry<Long, HStoreFile> e : this.storefiles.entrySet()) {
+          if (!compactedFiles.contains(e.getValue())) {
+            continue;
+          }
+          Long key = e.getKey();
+          MapFile.Reader reader = this.readers.remove(key);
+          if (reader != null) {
+            reader.close();
+          }
+          toDelete.put(key, e.getValue());
+        }
+
+        try {
+          // 3. Loading the new TreeMap.
+          // Change this.storefiles so it reflects new state but do not
+          // delete old store files until we have sent out notification of
+          // change in case old files are still being accessed by outstanding
+          // scanners.
+          for (Long key : toDelete.keySet()) {
+            this.storefiles.remove(key);
+          }
+          // Add new compacted Reader and store file.
+          Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs));
+          this.readers.put(orderVal,
+              // Use a block cache (if configured) for this reader since
+              // it is the only one.
+              finalCompactedFile.getReader(this.fs,
+                  this.family.isBloomFilterEnabled(),
+                  this.family.isBlockCacheEnabled()));
+          this.storefiles.put(orderVal, finalCompactedFile);
+          // Tell observers that list of Readers has changed.
+          notifyChangedReadersObservers();
+          // Finally, delete old store files.
+          for (HStoreFile hsf : toDelete.values()) {
+            hsf.delete();
+          }
+        } catch (IOException e) {
+          e = RemoteExceptionHandler.checkIOException(e);
+          LOG.error("Failed replacing compacted files for " +
+            this.storeNameStr +
+            ". Compacted file is " + finalCompactedFile.toString() +
+            ".  Files replaced are " + compactedFiles.toString() +
+            " some of which may have been already removed", e);
+        }
+        // 4. Compute new store size
+        storeSize = 0L;
+        for (HStoreFile hsf : storefiles.values()) {
+          storeSize += hsf.length();
+        }
+      }
+    } finally {
+      this.lock.writeLock().unlock();
+    }
+  }
+
+  // ////////////////////////////////////////////////////////////////////////////
+  // Accessors.
+  // (This is the only section that is directly useful!)
+  //////////////////////////////////////////////////////////////////////////////
+  
+  /**
+   * Return all the available columns for the given key.  The key indicates a 
+   * row and timestamp, but not a column name.
+   *
+   * The returned object should map column names to Cells.
+   */
+  void getFull(HStoreKey key, final Set<byte []> columns,
+      Map<byte [], Cell> results)
+  throws IOException {
+    Map<byte [], Long> deletes =
+      new TreeMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
+
+    // if the key is null, we're not even looking for anything. return.
+    if (key == null) {
+      return;
+    }
+    
+    this.lock.readLock().lock();
+    
+    // get from the memcache first.
+    memcache.getFull(key, columns, deletes, results);
+    
+    try {
+      MapFile.Reader[] maparray = getReaders();
+      
+      // examine each mapfile
+      for (int i = maparray.length - 1; i >= 0; i--) {
+        MapFile.Reader map = maparray[i];
+        
+        // synchronize on the map so that no one else iterates it at the same 
+        // time
+        getFullFromMapFile(map, key, columns, deletes, results);
+      }
+      
+    } finally {
+      this.lock.readLock().unlock();
+    }
+  }
+  
+  private void getFullFromMapFile(MapFile.Reader map, HStoreKey key, 
+    Set<byte []> columns, Map<byte [], Long> deletes, Map<byte [], Cell> results) 
+  throws IOException {
+    synchronized(map) {
+      long now = System.currentTimeMillis();
+
+      // seek back to the beginning
+      map.reset();
+      
+      // seek to the closest key that should match the row we're looking for
+      ImmutableBytesWritable readval = new ImmutableBytesWritable();
+      HStoreKey readkey = (HStoreKey)map.getClosest(key, readval);
+      if (readkey == null) {
+        return;
+      }
+      do {
+        byte [] readcol = readkey.getColumn();
+        
+        // if we're looking for this column (or all of them), and there isn't 
+        // already a value for this column in the results map, and the key we 
+        // just read matches, then we'll consider it
+        if ((columns == null || columns.contains(readcol)) 
+          && !results.containsKey(readcol)
+          && key.matchesWithoutColumn(readkey)) {
+          // if the value of the cell we're looking at right now is a delete, 
+          // we need to treat it differently
+          if(HLogEdit.isDeleted(readval.get())) {
+            // if it's not already recorded as a delete or recorded with a more
+            // recent delete timestamp, record it for later
+            if (!deletes.containsKey(readcol) 
+              || deletes.get(readcol).longValue() < readkey.getTimestamp()) {
+              deletes.put(readcol, readkey.getTimestamp());              
+            }
+          } else if (!(deletes.containsKey(readcol) 
+            && deletes.get(readcol).longValue() >= readkey.getTimestamp()) ) {
+            // So the cell itself isn't a delete, but there may be a delete 
+            // pending from earlier in our search. Only record this result if
+            // there aren't any pending deletes.
+            if (!(deletes.containsKey(readcol) &&
+                deletes.get(readcol).longValue() >= readkey.getTimestamp())) {
+              if (ttl == HConstants.FOREVER ||
+                      now < readkey.getTimestamp() + ttl) {
+                results.put(readcol, 
+                  new Cell(readval.get(), readkey.getTimestamp()));
+                // need to reinstantiate the readval so we can reuse it, 
+                // otherwise next iteration will destroy our result
+                readval = new ImmutableBytesWritable();
+              } else {
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("getFullFromMapFile: " + readkey + ": expired, skipped");
+                }
+              }
+            }
+          }
+        } else if (Bytes.compareTo(key.getRow(), readkey.getRow()) < 0) {
+          // if we've crossed into the next row, then we can just stop 
+          // iterating
+          break;
+        }
+        
+      } while(map.next(readkey, readval));
+    }
+  }
+  
+  MapFile.Reader [] getReaders() {
+    return this.readers.values().
+      toArray(new MapFile.Reader[this.readers.size()]);
+  }
+
+  /**
+   * Get the value for the indicated HStoreKey.  Grab the target value and the 
+   * previous <code>numVersions - 1</code> values, as well.
+   *
+   * Use {@link HConstants.ALL_VERSIONS} to retrieve all versions.
+   * @param key
+   * @param numVersions Number of versions to fetch.  Must be > 0.
+   * @return values for the specified versions
+   * @throws IOException
+   */
+  Cell[] get(HStoreKey key, int numVersions) throws IOException {
+    if (numVersions <= 0) {
+      throw new IllegalArgumentException("Number of versions must be > 0");
+    }
+    
+    this.lock.readLock().lock();
+    long now = System.currentTimeMillis();
+    try {
+      // Check the memcache
+      List<Cell> results = this.memcache.get(key, numVersions);
+      // If we got sufficient versions from memcache, return. 
+      if (results.size() == numVersions) {
+        return results.toArray(new Cell[results.size()]);
+      }
+
+      // Keep a list of deleted cell keys.  We need this because as we go through
+      // the store files, the cell with the delete marker may be in one file and
+      // the old non-delete cell value in a later store file. If we don't keep
+      // around the fact that the cell was deleted in a newer record, we end up
+      // returning the old value if user is asking for more than one version.
+      // This List of deletes should not large since we are only keeping rows
+      // and columns that match those set on the scanner and which have delete
+      // values.  If memory usage becomes an issue, could redo as bloom filter.
+      Map<byte [], List<Long>> deletes =
+        new TreeMap<byte [], List<Long>>(Bytes.BYTES_COMPARATOR);
+      // This code below is very close to the body of the getKeys method.
+      MapFile.Reader[] maparray = getReaders();
+      for(int i = maparray.length - 1; i >= 0; i--) {
+        MapFile.Reader map = maparray[i];
+        synchronized(map) {
+          map.reset();
+          ImmutableBytesWritable readval = new ImmutableBytesWritable();
+          HStoreKey readkey = (HStoreKey)map.getClosest(key, readval);
+          if (readkey == null) {
+            // map.getClosest returns null if the passed key is > than the
+            // last key in the map file.  getClosest is a bit of a misnomer
+            // since it returns exact match or the next closest key AFTER not
+            // BEFORE.
+            continue;
+          }
+          if (!readkey.matchesRowCol(key)) {
+            continue;
+          }
+          if (!isDeleted(readkey, readval.get(), true, deletes)) {
+            if (ttl == HConstants.FOREVER ||
+                    now < readkey.getTimestamp() + ttl) {
+              results.add(new Cell(readval.get(), readkey.getTimestamp()));
+            } else {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("get: " + readkey + ": expired, skipped");
+              }
+            }
+            // Perhaps only one version is wanted.  I could let this
+            // test happen later in the for loop test but it would cost
+            // the allocation of an ImmutableBytesWritable.
+            if (hasEnoughVersions(numVersions, results)) {
+              break;
+            }
+          }
+          for (readval = new ImmutableBytesWritable();
+              map.next(readkey, readval) &&
+              readkey.matchesRowCol(key) &&
+              !hasEnoughVersions(numVersions, results);
+              readval = new ImmutableBytesWritable()) {
+            if (!isDeleted(readkey, readval.get(), true, deletes)) {
+              if (ttl == HConstants.FOREVER ||
+                    now < readkey.getTimestamp() + ttl) {
+                results.add(new Cell(readval.get(), readkey.getTimestamp()));
+              } else {
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("get: " + readkey + ": expired, skipped");
+                }
+              }
+            }
+          }
+        }
+        if (hasEnoughVersions(numVersions, results)) {
+          break;
+        }
+      }
+      return results.size() == 0 ?
+        null : results.toArray(new Cell[results.size()]);
+    } finally {
+      this.lock.readLock().unlock();
+    }
+  }
+  
+  /**
+   * Small method to check if we are over the max number of versions
+   * or we acheived this family max versions. 
+   * The later happens when we have the situation described in HBASE-621.
+   * @param numVersions
+   * @param results
+   * @return 
+   */
+  private boolean hasEnoughVersions(final int numVersions,
+      final List<Cell> results) {
+    return (results.size() >= numVersions || results.size() >= family
+            .getMaxVersions());
+  }
+
+  /**
+   * Get <code>versions</code> keys matching the origin key's
+   * row/column/timestamp and those of an older vintage
+   * Default access so can be accessed out of {@link HRegionServer}.
+   * @param origin Where to start searching.
+   * @param versions How many versions to return. Pass
+   * {@link HConstants.ALL_VERSIONS} to retrieve all. Versions will include
+   * size of passed <code>allKeys</code> in its count.
+   * @param allKeys List of keys prepopulated by keys we found in memcache.
+   * This method returns this passed list with all matching keys found in
+   * stores appended.
+   * @return The passed <code>allKeys</code> with <code>versions</code> of
+   * matching keys found in store files appended.
+   * @throws IOException
+   */
+  List<HStoreKey> getKeys(final HStoreKey origin, final int versions)
+  throws IOException {
+      
+    List<HStoreKey> keys = this.memcache.getKeys(origin, versions);
+    if (keys.size() >= versions) {
+      return keys;
+    }
+    
+    // This code below is very close to the body of the get method.
+    this.lock.readLock().lock();
+    long now = System.currentTimeMillis();
+    try {
+      MapFile.Reader[] maparray = getReaders();
+      for(int i = maparray.length - 1; i >= 0; i--) {
+        MapFile.Reader map = maparray[i];
+        synchronized(map) {
+          map.reset();
+          
+          // do the priming read
+          ImmutableBytesWritable readval = new ImmutableBytesWritable();
+          HStoreKey readkey = (HStoreKey)map.getClosest(origin, readval);
+          if (readkey == null) {
+            // map.getClosest returns null if the passed key is > than the
+            // last key in the map file.  getClosest is a bit of a misnomer
+            // since it returns exact match or the next closest key AFTER not
+            // BEFORE.
+            continue;
+          }
+          
+          do{
+            // if the row matches, we might want this one.
+            if (rowMatches(origin, readkey)) {
+              // if the cell matches, then we definitely want this key.
+              if (cellMatches(origin, readkey)) {
+                // store the key if it isn't deleted or superceeded by what's
+                // in the memcache
+                if (!isDeleted(readkey, readval.get(), false, null) &&
+                    !keys.contains(readkey)) {
+                  if (ttl == HConstants.FOREVER ||
+                        now < readkey.getTimestamp() + ttl) {
+                    keys.add(new HStoreKey(readkey));
+                  } else {
+                    if (LOG.isDebugEnabled()) {
+                      LOG.debug("getKeys: " + readkey +
+                          ": expired, skipped");
+                    }
+                  }
+
+                  // if we've collected enough versions, then exit the loop.
+                  if (keys.size() >= versions) {
+                    break;
+                  }
+                }
+              } else {
+                // the cell doesn't match, but there might be more with different
+                // timestamps, so move to the next key
+                continue;
+              }
+            } else {
+              // the row doesn't match, so we've gone too far.
+              break;
+            }
+          } while (map.next(readkey, readval)); // advance to the next key
+        }
+      }
+      
+      return keys;
+    } finally {
+      this.lock.readLock().unlock();
+    }
+  }
+  
+  /**
+   * Find the key that matches <i>row</i> exactly, or the one that immediately
+   * preceeds it. WARNING: Only use this method on a table where writes occur 
+   * with stricly increasing timestamps. This method assumes this pattern of 
+   * writes in order to make it reasonably performant. 
+   */
+  byte [] getRowKeyAtOrBefore(final byte [] row)
+  throws IOException{
+    // Map of HStoreKeys that are candidates for holding the row key that
+    // most closely matches what we're looking for. We'll have to update it 
+    // deletes found all over the place as we go along before finally reading
+    // the best key out of it at the end.
+    SortedMap<HStoreKey, Long> candidateKeys = new TreeMap<HStoreKey, Long>();
+    
+    // Obtain read lock
+    this.lock.readLock().lock();
+    try {
+      // Process each store file
+      MapFile.Reader[] maparray = getReaders();
+      for (int i = maparray.length - 1; i >= 0; i--) {
+        // update the candidate keys from the current map file
+        rowAtOrBeforeFromMapFile(maparray[i], row, candidateKeys);
+      }
+      
+      // Finally, check the memcache
+      this.memcache.getRowKeyAtOrBefore(row, candidateKeys);
+      
+      // Return the best key from candidateKeys
+      return candidateKeys.isEmpty()? null: candidateKeys.lastKey().getRow();
+    } finally {
+      this.lock.readLock().unlock();
+    }
+  }
+  
+  /**
+   * Check an individual MapFile for the row at or before a given key 
+   * and timestamp
+   */
+  private void rowAtOrBeforeFromMapFile(MapFile.Reader map, final byte [] row, 
+    SortedMap<HStoreKey, Long> candidateKeys)
+  throws IOException {
+    ImmutableBytesWritable readval = new ImmutableBytesWritable();
+    HStoreKey readkey = new HStoreKey();
+
+    synchronized(map) {
+      // don't bother with the rest of this if the file is empty
+      map.reset();
+      if (!map.next(readkey, readval)) {
+        return;
+      }
+
+      long now = System.currentTimeMillis();
+      
+      // if there aren't any candidate keys yet, we'll do some things slightly
+      // different 
+      if (candidateKeys.isEmpty()) {
+        rowKeyFromMapFileEmptyKeys(map, row, candidateKeys, now);
+      } else {
+        rowKeyAtOrBeforeExistingCandKeys(map, row, candidateKeys, now);
+      }
+    }
+  }
+  
+  private void rowKeyFromMapFileEmptyKeys(MapFile.Reader map, byte[] row, 
+    SortedMap<HStoreKey, Long> candidateKeys, long now) 
+  throws IOException {
+    
+    HStoreKey searchKey = new HStoreKey(row);
+    ImmutableBytesWritable readval = new ImmutableBytesWritable();
+    HStoreKey readkey = new HStoreKey();
+    
+    // if the row we're looking for is past the end of this mapfile, just
+    // save time and add the last key to the candidates.
+    HStoreKey finalKey = new HStoreKey(); 
+    map.finalKey(finalKey);
+    if (Bytes.compareTo(finalKey.getRow(), row) < 0) {
+      candidateKeys.put(stripTimestamp(finalKey), 
+        new Long(finalKey.getTimestamp()));
+      return;
+    }
+    
+    HStoreKey deletedOrExpiredRow = null;
+    boolean foundCandidate = false;
+    while (!foundCandidate) {
+      // seek to the exact row, or the one that would be immediately before it
+      readkey = (HStoreKey)map.getClosest(searchKey, readval, true);
+
+      if (readkey == null) {
+        // didn't find anything that would match, so return
+        return;
+      }
+
+      do {
+        // if we have an exact match on row, and it's not a delete, save this
+        // as a candidate key
+        if (Bytes.equals(readkey.getRow(), row)) {
+          if (!HLogEdit.isDeleted(readval.get())) {
+            if (ttl == HConstants.FOREVER || 
+                now < readkey.getTimestamp() + ttl) {
+              candidateKeys.put(stripTimestamp(readkey), 
+                  new Long(readkey.getTimestamp()));
+              foundCandidate = true;
+              continue;
+            }
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("rowAtOrBeforeFromMapFile:" + readkey +
+              ": expired, skipped");
+            }
+          }
+          deletedOrExpiredRow = stripTimestamp(readkey);
+        } else if (Bytes.compareTo(readkey.getRow(), row) > 0 ) {
+          // if the row key we just read is beyond the key we're searching for,
+          // then we're done. return.
+          break;
+        } else {
+          // so, the row key doesn't match, but we haven't gone past the row
+          // we're seeking yet, so this row is a candidate for closest
+          // (assuming that it isn't a delete).
+          if (!HLogEdit.isDeleted(readval.get())) {
+            if (ttl == HConstants.FOREVER || 
+                now < readkey.getTimestamp() + ttl) {
+              candidateKeys.put(stripTimestamp(readkey), 
+                  new Long(readkey.getTimestamp()));
+              foundCandidate = true;
+              continue;
+            }
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("rowAtOrBeforeFromMapFile:" + readkey +
+              ": expired, skipped");
+            }
+          }
+          deletedOrExpiredRow = stripTimestamp(readkey);
+        }        
+      } while(map.next(readkey, readval));
+      
+      // If we get here and have no candidates but we did find a deleted or
+      // expired candidate, we need to look at the key before that
+      
+      if (!foundCandidate && deletedOrExpiredRow != null) {
+        searchKey = deletedOrExpiredRow;
+        deletedOrExpiredRow = null;
+        
+      } else {
+        // No candidates and no deleted or expired candidates. Give up.
+        break;
+      }
+    }
+  
+    // arriving here just means that we consumed the whole rest of the map
+    // without going "past" the key we're searching for. we can just fall
+    // through here.
+  }
+  
+  
+  private void rowKeyAtOrBeforeExistingCandKeys(MapFile.Reader map, byte[] row,
+    SortedMap<HStoreKey, Long> candidateKeys, long now) 
+  throws IOException {
+    
+    HStoreKey strippedKey = null;
+    ImmutableBytesWritable readval = new ImmutableBytesWritable();
+    HStoreKey readkey = new HStoreKey();
+
+
+    // if there are already candidate keys, we need to start our search 
+    // at the earliest possible key so that we can discover any possible
+    // deletes for keys between the start and the search key.
+    HStoreKey searchKey = new HStoreKey(candidateKeys.firstKey().getRow());
+
+    // if the row we're looking for is past the end of this mapfile, just
+    // save time and add the last key to the candidates.
+    HStoreKey finalKey = new HStoreKey(); 
+    map.finalKey(finalKey);
+    if (Bytes.compareTo(finalKey.getRow(), searchKey.getRow()) < 0) {
+      strippedKey = stripTimestamp(finalKey);
+
+      // if the candidate keys has a cell like this one already,
+      // then we might want to update the timestamp we're using on it
+      if (candidateKeys.containsKey(strippedKey)) {
+        long bestCandidateTs = 
+          candidateKeys.get(strippedKey).longValue();
+        if (bestCandidateTs < finalKey.getTimestamp()) {
+          candidateKeys.put(strippedKey, new Long(finalKey.getTimestamp()));
+        } 
+      } else {
+        // otherwise, this is a new key, so put it up as a candidate
+        candidateKeys.put(strippedKey, new Long(finalKey.getTimestamp()));
+      }
+      return;
+    }
+
+    // seek to the exact row, or the one that would be immediately before it
+    readkey = (HStoreKey)map.getClosest(searchKey, readval, true);
+
+    if (readkey == null) {
+      // didn't find anything that would match, so return
+      return;
+    }
+
+    do {
+      // if we have an exact match on row, and it's not a delete, save this
+      // as a candidate key
+      if (Bytes.equals(readkey.getRow(), row)) {
+        strippedKey = stripTimestamp(readkey);
+        if (!HLogEdit.isDeleted(readval.get())) {
+          if (ttl == HConstants.FOREVER || 
+              now < readkey.getTimestamp() + ttl) {
+            candidateKeys.put(strippedKey,
+                new Long(readkey.getTimestamp()));
+          } else {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("rowAtOrBeforeFromMapFile: " + readkey +
+                  ": expired, skipped");
+            }
+          }
+        } else {
+          // if the candidate keys contain any that might match by timestamp,
+          // then check for a match and remove it if it's too young to 
+          // survive the delete 
+          if (candidateKeys.containsKey(strippedKey)) {
+            long bestCandidateTs =
+              candidateKeys.get(strippedKey).longValue();
+            if (bestCandidateTs <= readkey.getTimestamp()) {
+              candidateKeys.remove(strippedKey);
+            } 
+          }
+        }
+      } else if (Bytes.compareTo(readkey.getRow(), row) > 0 ) {
+        // if the row key we just read is beyond the key we're searching for,
+        // then we're done. return.
+        return;
+      } else {
+        strippedKey = stripTimestamp(readkey);
+
+        // so, the row key doesn't match, but we haven't gone past the row
+        // we're seeking yet, so this row is a candidate for closest 
+        // (assuming that it isn't a delete).
+        if (!HLogEdit.isDeleted(readval.get())) {
+          if (ttl == HConstants.FOREVER || 
+              now < readkey.getTimestamp() + ttl) {
+            candidateKeys.put(strippedKey, readkey.getTimestamp());
+          } else {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("rowAtOrBeforeFromMapFile: " + readkey +
+                  ": expired, skipped");
+            }
+          }
+        } else {
+          // if the candidate keys contain any that might match by timestamp,
+          // then check for a match and remove it if it's too young to 
+          // survive the delete 
+          if (candidateKeys.containsKey(strippedKey)) {
+            long bestCandidateTs = 
+              candidateKeys.get(strippedKey).longValue();
+            if (bestCandidateTs <= readkey.getTimestamp()) {
+              candidateKeys.remove(strippedKey);
+            } 
+          }
+        }      
+      }
+    } while(map.next(readkey, readval));    
+  }
+  
+  static HStoreKey stripTimestamp(HStoreKey key) {
+    return new HStoreKey(key.getRow(), key.getColumn());
+  }
+    
+  /**
+   * Test that the <i>target</i> matches the <i>origin</i>. If the 
+   * <i>origin</i> has an empty column, then it's assumed to mean any column 
+   * matches and only match on row and timestamp. Otherwise, it compares the
+   * keys with HStoreKey.matchesRowCol().
+   * @param origin The key we're testing against
+   * @param target The key we're testing
+   */
+  private boolean cellMatches(HStoreKey origin, HStoreKey target){
+    // if the origin's column is empty, then we're matching any column
+    if (Bytes.equals(origin.getColumn(), HConstants.EMPTY_BYTE_ARRAY)){
+      // if the row matches, then...
+      if (Bytes.equals(target.getRow(), origin.getRow())) {
+        // check the timestamp
+        return target.getTimestamp() <= origin.getTimestamp();
+      }
+      return false;
+    }
+    // otherwise, we want to match on row and column
+    return target.matchesRowCol(origin);
+  }
+    
+  /**
+   * Test that the <i>target</i> matches the <i>origin</i>. If the <i>origin</i>
+   * has an empty column, then it just tests row equivalence. Otherwise, it uses
+   * HStoreKey.matchesRowCol().
+   * @param origin Key we're testing against
+   * @param target Key we're testing
+   */
+  private boolean rowMatches(HStoreKey origin, HStoreKey target){
+    // if the origin's column is empty, then we're matching any column
+    if (Bytes.equals(origin.getColumn(), HConstants.EMPTY_BYTE_ARRAY)) {
+      // if the row matches, then...
+      return Bytes.equals(target.getRow(), origin.getRow());
+    }
+    // otherwise, we want to match on row and column
+    return target.matchesRowCol(origin);
+  }
+  
+  /**
+   * Determines if HStore can be split
+   * 
+   * @return a StoreSize if store can be split, null otherwise
+   */
+  StoreSize checkSplit() {
+    if (this.storefiles.size() <= 0) {
+      return null;
+    }
+    if (storeSize < this.desiredMaxFileSize) {
+      return null;
+    }
+    this.lock.readLock().lock();
+    try {
+      // Not splitable if we find a reference store file present in the store.
+      boolean splitable = true;
+      long maxSize = 0L;
+      Long mapIndex = Long.valueOf(0L);
+      // Iterate through all the MapFiles
+      synchronized (storefiles) {
+        for (Map.Entry<Long, HStoreFile> e: storefiles.entrySet()) {
+          HStoreFile curHSF = e.getValue();
+          long size = curHSF.length();
+          if (size > maxSize) {
+            // This is the largest one so far
+            maxSize = size;
+            mapIndex = e.getKey();
+          }
+          if (splitable) {
+            splitable = !curHSF.isReference();
+          }
+        }
+      }
+      if (!splitable) {
+        return null;
+      }
+      MapFile.Reader r = this.readers.get(mapIndex);
+
+      // seek back to the beginning of mapfile
+      r.reset();
+
+      // get the first and last keys
+      HStoreKey firstKey = new HStoreKey();
+      HStoreKey lastKey = new HStoreKey();
+      Writable value = new ImmutableBytesWritable();
+      r.next(firstKey, value);
+      r.finalKey(lastKey);
+
+      // get the midkey
+      HStoreKey mk = (HStoreKey)r.midKey();
+      if (mk != null) {
+        // if the midkey is the same as the first and last keys, then we cannot
+        // (ever) split this region. 
+        if (Bytes.equals(mk.getRow(), firstKey.getRow()) && 
+            Bytes.equals(mk.getRow(), lastKey.getRow())) {
+          return null;
+        }
+        return new StoreSize(maxSize, mk.getRow());
+      }
+    } catch(IOException e) {
+      LOG.warn("Failed getting store size for " + this.storeNameStr, e);
+    } finally {
+      this.lock.readLock().unlock();
+    }
+    return null;
+  }
+  
+  /** @return aggregate size of HStore */
+  public long getSize() {
+    return storeSize;
+  }
+  
+  //////////////////////////////////////////////////////////////////////////////
+  // File administration
+  //////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Return a scanner for both the memcache and the HStore files
+   */
+  protected InternalScanner getScanner(long timestamp, byte [][] targetCols,
+      byte [] firstRow, RowFilterInterface filter)
+  throws IOException {
+    lock.readLock().lock();
+    try {
+      return new HStoreScanner(this, targetCols, firstRow, timestamp, filter);
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public String toString() {
+    return this.storeNameStr;
+  }
+
+  /**
+   * @param p Path to check.
+   * @return True if the path has format of a HStoreFile reference.
+   */
+  public static boolean isReference(final Path p) {
+    return isReference(p, REF_NAME_PARSER.matcher(p.getName()));
+  }
+ 
+  private static boolean isReference(final Path p, final Matcher m) {
+    if (m == null || !m.matches()) {
+      LOG.warn("Failed match of store file name " + p.toString());
+      throw new RuntimeException("Failed match of store file name " +
+          p.toString());
+    }
+    return m.groupCount() > 1 && m.group(2) != null;
+  }
+
+  /**
+   * @return Current list of store files.
+   */
+  SortedMap<Long, HStoreFile> getStorefiles() {
+    synchronized (this.storefiles) {
+      SortedMap<Long, HStoreFile> copy =
+        new TreeMap<Long, HStoreFile>(this.storefiles);
+      return copy;
+    }
+  }
+  
+  class StoreSize {
+    private final long size;
+    private final byte[] key;
+    StoreSize(long size, byte[] key) {
+      this.size = size;
+      this.key = new byte[key.length];
+      System.arraycopy(key, 0, this.key, 0, key.length);
+    }
+    /* @return the size */
+    long getSize() {
+      return size;
+    }
+    /* @return the key */
+    byte[] getKey() {
+      return key;
+    }
+  }
+}
\ No newline at end of file