You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2009/04/12 12:39:58 UTC

svn commit: r764289 [3/8] - in /hadoop/hbase/trunk: ./ src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/client/ src/java/org/apache/hadoop/hbase/filter/ src/java/org/apache/hadoop/hbase/io/ src/java/org/apache/hadoop/hbase/io/hfile/ s...

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=764289&r1=764288&r2=764289&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Sun Apr 12 10:39:55 2009
@@ -22,15 +22,16 @@
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableSet;
 import java.util.Set;
-import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -42,17 +43,17 @@
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.ColumnNameParseException;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.DroppedSnapshotException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HStoreKey;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.RegionHistorian;
+import org.apache.hadoop.hbase.ValueOverMaxLengthException;
 import org.apache.hadoop.hbase.filter.RowFilterInterface;
 import org.apache.hadoop.hbase.io.BatchOperation;
 import org.apache.hadoop.hbase.io.BatchUpdate;
@@ -122,10 +123,8 @@
 
   private final Map<Integer, byte []> locksToRows =
     new ConcurrentHashMap<Integer, byte []>();
-  private final Map<Integer, TreeMap<HStoreKey, byte []>> targetColumns =
-      new ConcurrentHashMap<Integer, TreeMap<HStoreKey, byte []>>();
-  protected final Map<Integer, Store> stores =
-    new ConcurrentHashMap<Integer, Store>();
+  protected final Map<byte [], Store> stores =
+    new ConcurrentSkipListMap<byte [], Store>(KeyValue.FAMILY_COMPARATOR);
   final AtomicLong memcacheSize = new AtomicLong(0);
 
   // This is the table subdirectory.
@@ -136,6 +135,8 @@
   final HRegionInfo regionInfo;
   final Path regiondir;
   private final Path regionCompactionDir;
+  KeyValue.KVComparator comparator;
+  private KeyValue.KVComparator comparatorIgnoreTimestamp;
 
   /*
    * Set this when scheduling compaction if want the next compaction to be a
@@ -196,10 +197,6 @@
   private long minSequenceId;
   final AtomicInteger activeScannerCount = new AtomicInteger(0);
 
-  //////////////////////////////////////////////////////////////////////////////
-  // Constructor
-  //////////////////////////////////////////////////////////////////////////////
-
   /**
    * HRegion constructor.
    *
@@ -219,10 +216,14 @@
    * @param flushListener an object that implements CacheFlushListener or null
    * making progress to master -- otherwise master might think region deploy
    * failed.  Can be null.
+   * @throws IOException 
    */
   public HRegion(Path basedir, HLog log, FileSystem fs, HBaseConfiguration conf, 
       HRegionInfo regionInfo, FlushRequester flushListener) {
     this.basedir = basedir;
+    this.comparator = regionInfo.getComparator();
+    this.comparatorIgnoreTimestamp =
+      this.comparator.getComparatorIgnoringTimestamps();
     this.log = log;
     this.fs = fs;
     this.conf = conf;
@@ -272,7 +273,7 @@
     long minSeqId = Integer.MAX_VALUE;
     for (HColumnDescriptor c : this.regionInfo.getTableDesc().getFamilies()) {
       Store store = instantiateHStore(this.basedir, c, oldLogFile, reporter);
-      this.stores.put(Bytes.mapKey(c.getName()), store);
+      this.stores.put(c.getName(), store);
       long storeSeqId = store.getMaxSequenceId();
       if (storeSeqId > maxSeqId) {
         maxSeqId = storeSeqId;
@@ -524,7 +525,7 @@
    * @return two brand-new (and open) HRegions or null if a split is not needed
    * @throws IOException
    */
-  HRegion[] splitRegion(final byte [] splitRow) throws IOException {
+  HRegion [] splitRegion(final byte [] splitRow) throws IOException {
     synchronized (splitLock) {
       if (closed.get()) {
         return null;
@@ -532,11 +533,13 @@
       // Add start/end key checking: hbase-428.
       byte [] startKey = this.regionInfo.getStartKey();
       byte [] endKey = this.regionInfo.getEndKey();
-      if (HStoreKey.equalsTwoRowKeys(startKey, splitRow)) {
+      if (this.comparator.matchingRows(startKey, 0, startKey.length,
+          splitRow, 0, splitRow.length)) {
         LOG.debug("Startkey and midkey are same, not splitting");
         return null;
       }
-      if (HStoreKey.equalsTwoRowKeys(splitRow, endKey)) {
+      if (this.comparator.matchingRows(splitRow, 0, splitRow.length,
+          endKey, 0, endKey.length)) {
         LOG.debug("Endkey and midkey are same, not splitting");
         return null;
       }
@@ -704,10 +707,10 @@
         doRegionCompactionPrep();
         long maxSize = -1;
         for (Store store: stores.values()) {
-          final Store.StoreSize size = store.compact(majorCompaction);
-          if (size != null && size.getSize() > maxSize) {
-            maxSize = size.getSize();
-            splitRow = size.getSplitRow();
+          final Store.StoreSize ss = store.compact(majorCompaction);
+          if (ss != null && ss.getSize() > maxSize) {
+            maxSize = ss.getSize();
+            splitRow = ss.getSplitRow();
           }
         }
         doRegionCompactionCleanup();
@@ -934,16 +937,14 @@
    * @param column
    * @param ts
    * @param nv
-   * @return array of values one element per version that matches the timestamp, 
-   * or null if there are no matches.
+   * @return Results or null if none.
    * @throws IOException
    */
-  public Cell[] get(final byte[] row, final byte[] column, final long ts,
+  public List<KeyValue> get(final byte[] row, final byte[] column, final long ts,
       final int nv) 
   throws IOException {
-    long timestamp = ts == -1 ? HConstants.LATEST_TIMESTAMP : ts;
-    int numVersions = nv == -1 ? 1 : nv;
-
+    long timestamp = ts == -1? HConstants.LATEST_TIMESTAMP : ts;
+    int numVersions = nv == -1? 1 : nv;
     splitsAndClosesLock.readLock().lock();
     try {
       if (this.closed.get()) {
@@ -953,17 +954,98 @@
       checkRow(row);
       checkColumn(column);
       // Don't need a row lock for a simple get
-      HStoreKey key = new HStoreKey(row, column, timestamp);
-      Cell[] result = getStore(column).get(key, numVersions);
+      List<KeyValue> result = getStore(column).
+        get(KeyValue.createFirstOnRow(row, column, timestamp), numVersions);
       // Guarantee that we return null instead of a zero-length array, 
       // if there are no results to return.
-      return (result == null || result.length == 0)? null : result;
+      return (result == null || result.isEmpty())? null : result;
     } finally {
       splitsAndClosesLock.readLock().unlock();
     }
   }
 
   /**
+   * Data structure with a counter that is accessible rather than create a
+   * new Integer every time we want to up the counter.  Initializes at count 1.
+   */
+  static class Counter {
+    int counter = 1;
+  }
+
+  /*
+   * Check to see if we've not gone over threshold for this particular
+   * column.
+   * @param kv
+   * @param versions
+   * @param versionsCount
+   * @return True if its ok to add current value.
+   */
+  static boolean okToAddResult(final KeyValue kv, final int versions,
+      final Map<KeyValue, HRegion.Counter> versionsCount) {
+    if (versionsCount == null) {
+      return true;
+    }
+    if (versionsCount.containsKey(kv)) {
+      if (versionsCount.get(kv).counter < versions) {
+        return true;
+      }
+    } else {
+      return true;
+    }
+    return false;
+  }
+
+  /*
+   * Used adding item found to list of results getting.
+   * @param kv
+   * @param versionsCount
+   * @param results
+   */
+  static void addResult(final KeyValue kv,
+      final Map<KeyValue, HRegion.Counter> versionsCount,
+      final List<KeyValue> results) {
+    // Don't add if already present; i.e. ignore second entry.
+    if (results.contains(kv)) return;
+    results.add(kv);
+    if (versionsCount == null) {
+      return;
+    }
+    if (!versionsCount.containsKey(kv)) {
+      versionsCount.put(kv, new HRegion.Counter());
+    } else {
+      versionsCount.get(kv).counter++;
+    }
+  }
+
+  /*
+   * @param versions Number of versions to get.
+   * @param versionsCount May be null.
+   * @param columns Columns we want to fetch.
+   * @return True if has enough versions.
+   */
+  static boolean hasEnoughVersions(final int versions,
+      final Map<KeyValue, HRegion.Counter> versionsCount,
+      final Set<byte []> columns) {
+    if (columns == null || versionsCount == null) {
+      // Wants all columns so just keep going
+      return false;
+    }
+    if (columns.size() > versionsCount.size()) {
+      return false;
+    }
+    if (versions == 1) {
+      return true;
+    }
+    // Need to look at each to make sure at least versions.
+    for (Map.Entry<KeyValue, HRegion.Counter> e: versionsCount.entrySet()) {
+      if (e.getValue().counter < versions) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
    * Fetch all the columns for the indicated row at a specified timestamp.
    * Returns a HbaseMapWritable that maps column names to values.
    *
@@ -982,7 +1064,7 @@
    * @throws IOException
    */
   public HbaseMapWritable<byte [], Cell> getFull(final byte [] row,
-      final Set<byte []> columns, final long ts,
+      final NavigableSet<byte []> columns, final long ts,
       final int numVersions, final Integer lockid) 
   throws IOException {
     // Check columns passed
@@ -991,16 +1073,16 @@
         checkColumn(column);
       }
     }
-    HStoreKey key = new HStoreKey(row, ts);
+    List<KeyValue> keyvalues = new ArrayList<KeyValue>();
+    Map<KeyValue, Counter> versionCounter =
+      new TreeMap<KeyValue, Counter>(this.comparatorIgnoreTimestamp);
     Integer lid = getLock(lockid,row);
     HashSet<Store> storeSet = new HashSet<Store>();
     try {
-      HbaseMapWritable<byte [], Cell> result =
-        new HbaseMapWritable<byte [], Cell>();
       // Get the concerned columns or all of them
       if (columns != null) {
         for (byte[] bs : columns) {
-          Store store = stores.get(Bytes.mapKey(HStoreKey.getFamily(bs)));
+          Store store = stores.get(bs);
           if (store != null) {
             storeSet.add(store);
           }
@@ -1008,25 +1090,31 @@
       } else {
         storeSet.addAll(stores.values());
       }
+      long timestamp =
+        (ts == HConstants.LATEST_TIMESTAMP)? System.currentTimeMillis(): ts;
+      KeyValue key = KeyValue.createFirstOnRow(row, timestamp);
       // For each column name that is just a column family, open the store
       // related to it and fetch everything for that row. HBASE-631
       // Also remove each store from storeSet so that these stores
       // won't be opened for no reason. HBASE-783
       if (columns != null) {
-        for (byte[] bs : columns) {
-          if (HStoreKey.getFamilyDelimiterIndex(bs) == (bs.length - 1)) {
-            Store store = stores.get(Bytes.mapKey(HStoreKey.getFamily(bs)));
-            store.getFull(key, null, numVersions, result);
+        for (byte [] bs : columns) {
+          // TODO: Fix so we use comparator in KeyValue that looks at
+          // column family portion only.
+          if (KeyValue.getFamilyDelimiterIndex(bs, 0, bs.length) == (bs.length - 1)) {
+            Store store = stores.get(bs);
+            store.getFull(key, null, null, numVersions, versionCounter,
+              keyvalues, timestamp);
             storeSet.remove(store);
           }
         }
       }
-      
       for (Store targetStore: storeSet) {
-        targetStore.getFull(key, columns, numVersions, result);
+        targetStore.getFull(key, columns, null, numVersions, versionCounter,
+          keyvalues, timestamp);
       }
       
-      return result;
+      return Cell.createCells(keyvalues);
     } finally {
       if(lockid == null) releaseRowLock(lid);
     }
@@ -1057,77 +1145,35 @@
    * @throws IOException
    */
   public RowResult getClosestRowBefore(final byte [] row,
-      final byte [] columnFamily)
+    final byte [] columnFamily)
   throws IOException{
     // look across all the HStores for this region and determine what the
     // closest key is across all column families, since the data may be sparse
-    HStoreKey key = null;
+    KeyValue key = null;
     checkRow(row);
     splitsAndClosesLock.readLock().lock();
     try {
       Store store = getStore(columnFamily);
+      KeyValue kv = new KeyValue(row, HConstants.LATEST_TIMESTAMP);
       // get the closest key. (HStore.getRowKeyAtOrBefore can return null)
-      byte [] closestKey = store.getRowKeyAtOrBefore(row);
-      // If it happens to be an exact match, we can stop.
-      // Otherwise, we need to check if it's the max and move to the next
-      if (closestKey != null) {
-        if (HStoreKey.equalsTwoRowKeys(row, closestKey)) {
-          key = new HStoreKey(closestKey);
-        }
-        if (key == null) {
-          key = new HStoreKey(closestKey);
-        }
-      }
+      key = store.getRowKeyAtOrBefore(kv);
       if (key == null) {
         return null;
       }
-
-      // Now that we've found our key, get the values
-      HbaseMapWritable<byte [], Cell> cells =
-        new HbaseMapWritable<byte [], Cell>();
-      // This will get all results for this store.
-      store.getFull(key, null, 1, cells);
-      return new RowResult(key.getRow(), cells);
+      List<KeyValue> results = new ArrayList<KeyValue>();
+      // This will get all results for this store.  TODO: Do I have to make a
+      // new key?
+      if (!this.comparator.matchingRows(kv, key)) {
+        kv = new KeyValue(key.getRow(), HConstants.LATEST_TIMESTAMP);
+      }
+      store.getFull(kv, null, null, 1, null, results, System.currentTimeMillis());
+      // Convert to RowResult.  TODO: Remove need to do this.
+      return RowResult.createRowResult(results);
     } finally {
       splitsAndClosesLock.readLock().unlock();
     }
   }
 
-  /*
-   * Get <code>versions</code> keys matching the origin key's
-   * row/column/timestamp and those of an older vintage.
-   * Public so available when debugging.
-   * @param origin Where to start searching.
-   * @param versions How many versions to return. Pass HConstants.ALL_VERSIONS
-   * to retrieve all.
-   * @return Ordered list of <code>versions</code> keys going from newest back.
-   * @throws IOException
-   */
-  private Set<HStoreKey> getKeys(final HStoreKey origin, final int versions)
-  throws IOException {
-    Set<HStoreKey> keys = new TreeSet<HStoreKey>();
-    Collection<Store> storesToCheck = null;
-    if (origin.getColumn() == null || origin.getColumn().length == 0) {
-      // All families
-      storesToCheck = this.stores.values();
-    } else {
-      storesToCheck = new ArrayList<Store>(1);
-      storesToCheck.add(getStore(origin.getColumn()));
-    }
-    long now = System.currentTimeMillis();
-    for (Store targetStore: storesToCheck) {
-      if (targetStore != null) {
-        // Pass versions without modification since in the store getKeys, it
-        // includes the size of the passed <code>keys</code> array when counting.
-        List<HStoreKey> r = targetStore.getKeys(origin, versions, now, null);
-        if (r != null) {
-          keys.addAll(r);
-        }
-      }
-    }
-    return keys;
-  }
-  
   /**
    * Return an iterator that scans over the HRegion, returning the indicated 
    * columns for only the rows that match the data filter.  This Iterator must
@@ -1153,13 +1199,18 @@
         throw new IOException("Region " + this + " closed");
       }
       HashSet<Store> storeSet = new HashSet<Store>();
+      NavigableSet<byte []> columns =
+        new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
+      // Below we make up set of stores we want scanners on and we fill out the
+      // list of columns.
       for (int i = 0; i < cols.length; i++) {
-        Store s = stores.get(Bytes.mapKey(HStoreKey.getFamily(cols[i])));
+        columns.add(cols[i]);
+        Store s = stores.get(cols[i]);
         if (s != null) {
           storeSet.add(s);
         }
       }
-      return new HScanner(cols, firstRow, timestamp,
+      return new HScanner(columns, firstRow, timestamp,
         storeSet.toArray(new Store [storeSet.size()]), filter);
     } finally {
       newScannerLock.readLock().unlock();
@@ -1206,13 +1257,13 @@
   public void batchUpdate(BatchUpdate b, Integer lockid, boolean writeToWAL)
   throws IOException {
     checkReadOnly();
+    validateValuesLength(b);
 
     // Do a rough check that we have resources to accept a write.  The check is
     // 'rough' in that between the resource check and the call to obtain a 
     // read lock, resources may run out.  For now, the thought is that this
     // will be extremely rare; we'll deal with it when it happens.
     checkResources();
-
     splitsAndClosesLock.readLock().lock();
     try {
       // We obtain a per-row lock, so other clients will block while one client
@@ -1222,50 +1273,53 @@
       // invokes a HRegion#abort.
       byte [] row = b.getRow();
       // If we did not pass an existing row lock, obtain a new one
-      Integer lid = getLock(lockid,row);
-      long commitTime = (b.getTimestamp() == LATEST_TIMESTAMP) ?
-          System.currentTimeMillis() : b.getTimestamp();
+      Integer lid = getLock(lockid, row);
+      long commitTime = b.getTimestamp() == LATEST_TIMESTAMP?
+        System.currentTimeMillis(): b.getTimestamp();
+      Set<byte []> latestTimestampDeletes = null;
+      List<KeyValue> edits = new ArrayList<KeyValue>();
       try {
-        List<byte []> deletes = null;
         for (BatchOperation op: b) {
-          HStoreKey key = new HStoreKey(row, op.getColumn(), commitTime);
-          byte[] val = null;
+          byte [] column = op.getColumn();
+          checkColumn(column);
+          KeyValue kv = null;
           if (op.isPut()) {
-            val = op.getValue();
+            byte [] val = op.getValue();
             if (HLogEdit.isDeleted(val)) {
-              throw new IOException("Cannot insert value: " + val);
+              throw new IOException("Cannot insert value: " +
+                Bytes.toString(val));
             }
+            kv = new KeyValue(row, column, commitTime, val);
           } else {
+            // Its a delete.
             if (b.getTimestamp() == LATEST_TIMESTAMP) {
-              // Save off these deletes
-              if (deletes == null) {
-                deletes = new ArrayList<byte []>();
+              // Save off these deletes of the most recent thing added on the
+              // family.
+              if (latestTimestampDeletes == null) {
+                latestTimestampDeletes =
+                  new TreeSet<byte []>(Bytes.BYTES_RAWCOMPARATOR);
               }
-              deletes.add(op.getColumn());
-            } else {
-              val = HLogEdit.DELETED_BYTES;
+              latestTimestampDeletes.add(op.getColumn());
+              continue;
             }
+            // Its an explicit timestamp delete
+            kv = new KeyValue(row, column, commitTime, KeyValue.Type.Delete,
+              HConstants.EMPTY_BYTE_ARRAY);
           }
-          if (val != null) {
-            localput(lid, key, val);
-          }
+          edits.add(kv);
         }
-        TreeMap<HStoreKey, byte[]> edits =
-          this.targetColumns.remove(lid);
-
-        if (edits != null && edits.size() > 0) {
+        if (!edits.isEmpty()) {
           update(edits, writeToWAL);
         }
-
-        if (deletes != null && deletes.size() > 0) {
-          // We have some LATEST_TIMESTAMP deletes to run.
-          for (byte [] column: deletes) {
+        if (latestTimestampDeletes != null &&
+            !latestTimestampDeletes.isEmpty()) {
+          // We have some LATEST_TIMESTAMP deletes to run.  Can't do them inline
+          // as edits.  Need to do individually after figuring which is latest
+          // timestamp to delete.
+          for (byte [] column: latestTimestampDeletes) {
             deleteMultiple(row, column, LATEST_TIMESTAMP, 1);
           }
         }
-      } catch (IOException e) {
-        this.targetColumns.remove(Long.valueOf(lid));
-        throw e;
       } finally {
         if(lockid == null) releaseRowLock(lid);
       }
@@ -1274,7 +1328,6 @@
     }
   }
 
-
   /**
    * Performs an atomic check and save operation. Checks if
    * the specified expected values have changed, and if not
@@ -1297,66 +1350,72 @@
     // should read the comments from the batchUpdate method
     boolean success = true;
     checkReadOnly();
+    validateValuesLength(b);
     checkResources();
     splitsAndClosesLock.readLock().lock();
     try {
       byte[] row = b.getRow();
       Integer lid = getLock(lockid,row);
       try {
-        Set<byte[]> keySet = expectedValues.keySet();
-        Map<byte[],Cell> actualValues = this.getFull(row,keySet,
-        HConstants.LATEST_TIMESTAMP, 1,lid);
+        NavigableSet<byte []> keySet =
+          new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
+        keySet.addAll(expectedValues.keySet());
+        Map<byte[],Cell> actualValues = getFull(row, keySet,
+          HConstants.LATEST_TIMESTAMP, 1,lid);
         for (byte[] key : keySet) {
           // If test fails exit
           if(!Bytes.equals(actualValues.get(key).getValue(),
-            expectedValues.get(key))) {
+              expectedValues.get(key))) {
             success = false;
             break;
           }
         }
-        
         if (success) {
           long commitTime = (b.getTimestamp() == LATEST_TIMESTAMP)?
             System.currentTimeMillis(): b.getTimestamp();
-          List<byte []> deletes = null;
+          Set<byte []> latestTimestampDeletes = null;
+          List<KeyValue> edits = new ArrayList<KeyValue>();
           for (BatchOperation op: b) {
-            HStoreKey key = new HStoreKey(row, op.getColumn(), commitTime);
-            byte[] val = null;
+            byte [] column = op.getColumn();
+            KeyValue kv = null;
             if (op.isPut()) {
-              val = op.getValue();
+              byte [] val = op.getValue();
               if (HLogEdit.isDeleted(val)) {
-                throw new IOException("Cannot insert value: " + val);
+                throw new IOException("Cannot insert value: " +
+                  Bytes.toString(val));
               }
+              kv = new KeyValue(row, column, commitTime, val);
             } else {
+              // Its a delete.
               if (b.getTimestamp() == LATEST_TIMESTAMP) {
-                // Save off these deletes
-                if (deletes == null) {
-                  deletes = new ArrayList<byte []>();
+                // Save off these deletes of the most recent thing added on
+                // the family.
+                if (latestTimestampDeletes == null) {
+                  latestTimestampDeletes =
+                    new TreeSet<byte []>(Bytes.BYTES_RAWCOMPARATOR);
                 }
-                deletes.add(op.getColumn());
+                latestTimestampDeletes.add(op.getColumn());
               } else {
-                val = HLogEdit.DELETED_BYTES;
+                // Its an explicit timestamp delete
+                kv = new KeyValue(row, column, commitTime,
+                  KeyValue.Type.Delete, HConstants.EMPTY_BYTE_ARRAY);
               }
             }
-            if (val != null) {
-              localput(lid, key, val);
-            }
+            edits.add(kv);
           }
-          TreeMap<HStoreKey, byte[]> edits =
-            this.targetColumns.remove(lid);
-          if (edits != null && edits.size() > 0) {
+          if (!edits.isEmpty()) {
             update(edits, writeToWAL);
           }
-          if (deletes != null && deletes.size() > 0) {
-            // We have some LATEST_TIMESTAMP deletes to run.
-            for (byte [] column: deletes) {
+          if (latestTimestampDeletes != null &&
+              !latestTimestampDeletes.isEmpty()) {
+            // We have some LATEST_TIMESTAMP deletes to run.  Can't do them inline
+            // as edits.  Need to do individually after figuring which is latest
+            // timestamp to delete.
+            for (byte [] column: latestTimestampDeletes) {
               deleteMultiple(row, column, LATEST_TIMESTAMP, 1);
             }
           }
         }
-      } catch (IOException e) {
-        this.targetColumns.remove(Long.valueOf(lid));
-        throw e;
       } finally {
         if(lockid == null) releaseRowLock(lid);
       }
@@ -1367,6 +1426,31 @@
   }
 
   /*
+   * Utility method to verify values length
+   * @param batchUpdate The update to verify
+   * @throws IOException Thrown if a value is too long
+   */
+  private void validateValuesLength(BatchUpdate batchUpdate)
+  throws IOException {
+    for (Iterator<BatchOperation> iter = 
+      batchUpdate.iterator(); iter.hasNext();) {
+      BatchOperation operation = iter.next();
+      if (operation.getValue() != null) {
+        HColumnDescriptor fam = this.regionInfo.getTableDesc().
+          getFamily(operation.getColumn());
+        if (fam != null) {
+          int maxLength = fam.getMaxValueLength();
+          if (operation.getValue().length > maxLength) {
+            throw new ValueOverMaxLengthException("Value in column "
+                + Bytes.toString(operation.getColumn()) + " is too long. "
+                + operation.getValue().length + " instead of " + maxLength);
+          }
+        }
+      }
+    }
+  }
+
+  /*
    * Check if resources to support an update.
    * 
    * Here we synchronize on HRegion, a broad scoped lock.  Its appropriate
@@ -1417,9 +1501,8 @@
     checkReadOnly();
     Integer lid = getLock(lockid,row);
     try {
-      // Delete ALL versions rather than MAX_VERSIONS.  If we just did
-      // MAX_VERSIONS, then if 2* MAX_VERSION cells, subsequent gets would
-      // get old stuff.
+      // Delete ALL versions rather than column family VERSIONS.  If we just did
+      // VERSIONS, then if 2* VERSION cells, subsequent gets would get old stuff.
       deleteMultiple(row, column, ts, ALL_VERSIONS);
     } finally {
       if(lockid == null) releaseRowLock(lid);
@@ -1433,25 +1516,28 @@
    * @param lockid Row lock
    * @throws IOException
    */
-  @SuppressWarnings("unchecked")
   public void deleteAll(final byte [] row, final long ts, final Integer lockid)
   throws IOException {
     checkReadOnly();
     Integer lid = getLock(lockid, row);
-    long now = System.currentTimeMillis();
+    long time = ts;
+    if (ts == HConstants.LATEST_TIMESTAMP) {
+      time = System.currentTimeMillis();
+    }
+    KeyValue kv = KeyValue.createFirstOnRow(row, time);
     try {
       for (Store store : stores.values()) {
-        List<HStoreKey> keys = store.getKeys(new HStoreKey(row, ts),
-          ALL_VERSIONS, now, null);
-        TreeMap<HStoreKey, byte []> edits = new TreeMap<HStoreKey, byte []>(
-          HStoreKey.getWritableComparator(store.getHRegionInfo()));
-        for (HStoreKey key: keys) {
-          edits.put(key, HLogEdit.DELETED_BYTES);
+        List<KeyValue> keyvalues = new ArrayList<KeyValue>();
+        store.getFull(kv, null, null, ALL_VERSIONS, null, keyvalues, time);
+        List<KeyValue> edits = new ArrayList<KeyValue>();
+        for (KeyValue key: keyvalues) {
+          // This is UGLY. COPY OF KEY PART OF KeyValue.
+          edits.add(key.cloneDelete());
         }
         update(edits);
       }
     } finally {
-      if(lockid == null) releaseRowLock(lid);
+      if (lockid == null) releaseRowLock(lid);
     }
   }
   
@@ -1465,21 +1551,21 @@
    * @param lockid Row lock
    * @throws IOException
    */
-  @SuppressWarnings("unchecked")
   public void deleteAllByRegex(final byte [] row, final String columnRegex, 
       final long timestamp, final Integer lockid) throws IOException {
     checkReadOnly();
     Pattern columnPattern = Pattern.compile(columnRegex);
     Integer lid = getLock(lockid, row);
     long now = System.currentTimeMillis();
+    KeyValue kv = new KeyValue(row, timestamp);
     try {
       for (Store store : stores.values()) {
-        List<HStoreKey> keys = store.getKeys(new HStoreKey(row, timestamp),
-            ALL_VERSIONS, now, columnPattern);
-        TreeMap<HStoreKey, byte []> edits = new TreeMap<HStoreKey, byte []>(
-          HStoreKey.getWritableComparator(store.getHRegionInfo()));
-        for (HStoreKey key: keys) {
-          edits.put(key, HLogEdit.DELETED_BYTES);
+        List<KeyValue> keyvalues = new ArrayList<KeyValue>();
+        store.getFull(kv, null, columnPattern, ALL_VERSIONS, null, keyvalues,
+          now);
+        List<KeyValue> edits = new ArrayList<KeyValue>();
+        for (KeyValue key: keyvalues) {
+          edits.add(key.cloneDelete());
         }
         update(edits);
       }
@@ -1498,7 +1584,6 @@
    * @param lockid Row lock
    * @throws IOException
    */
-  @SuppressWarnings("unchecked")
   public void deleteFamily(byte [] row, byte [] family, long timestamp,
       final Integer lockid)
   throws IOException{
@@ -1509,20 +1594,20 @@
       // find the HStore for the column family
       Store store = getStore(family);
       // find all the keys that match our criteria
-      List<HStoreKey> keys = store.getKeys(new HStoreKey(row, timestamp),
-        ALL_VERSIONS, now, null);
+      List<KeyValue> keyvalues = new ArrayList<KeyValue>();
+      store.getFull(new KeyValue(row, timestamp), null, null, ALL_VERSIONS,
+        null, keyvalues, now);
       // delete all the cells
-      TreeMap<HStoreKey, byte []> edits = new TreeMap<HStoreKey, byte []>(
-        HStoreKey.getWritableComparator(store.getHRegionInfo()));
-      for (HStoreKey key: keys) {
-        edits.put(key, HLogEdit.DELETED_BYTES);
+      List<KeyValue> edits = new ArrayList<KeyValue>();
+      for (KeyValue kv: keyvalues) {
+        edits.add(kv.cloneDelete());
       }
       update(edits);
     } finally {
       if(lockid == null) releaseRowLock(lid);
     }
   }
-  
+
   /**
    * Delete all cells for a row with all the matching column families by
    * familyRegex with timestamps less than or equal to <i>timestamp</i>.
@@ -1534,14 +1619,15 @@
    * @param lockid Row lock
    * @throws IOException
    */
-  @SuppressWarnings("unchecked")
-  public void deleteFamilyByRegex(byte [] row, String familyRegex, long timestamp, 
-      final Integer lockid) throws IOException {
+  public void deleteFamilyByRegex(byte [] row, String familyRegex,
+      final long timestamp, final Integer lockid)
+  throws IOException {
     checkReadOnly();
     // construct the family regex pattern
     Pattern familyPattern = Pattern.compile(familyRegex);
     Integer lid = getLock(lockid, row);
     long now = System.currentTimeMillis();
+    KeyValue kv = new KeyValue(row, timestamp);
     try {
       for(Store store: stores.values()) {
         String familyName = Bytes.toString(store.getFamily().getName());
@@ -1549,12 +1635,11 @@
         if(!(familyPattern.matcher(familyName).matches())) 
           continue;
         
-        List<HStoreKey> keys = store.getKeys(new HStoreKey(row, timestamp),
-          ALL_VERSIONS, now, null);
-        TreeMap<HStoreKey, byte []> edits = new TreeMap<HStoreKey, byte []>(
-          HStoreKey.getWritableComparator(store.getHRegionInfo()));
-        for (HStoreKey key: keys) {
-          edits.put(key, HLogEdit.DELETED_BYTES);
+        List<KeyValue> keyvalues = new ArrayList<KeyValue>();
+        store.getFull(kv, null, null, ALL_VERSIONS, null, keyvalues, now);
+        List<KeyValue> edits = new ArrayList<KeyValue>();
+        for (KeyValue k: keyvalues) {
+          edits.add(k.cloneDelete());
         }
         update(edits);
       }
@@ -1574,21 +1659,23 @@
    * {@link HConstants#ALL_VERSIONS} to delete all.
    * @throws IOException
    */
-  @SuppressWarnings("unchecked")
   private void deleteMultiple(final byte [] row, final byte [] column,
       final long ts, final int versions)
   throws IOException {
     checkReadOnly();
-    HStoreKey origin = new HStoreKey(row, column, ts);
-    Set<HStoreKey> keys = getKeys(origin, versions);
-    if (keys.size() > 0) {
-      // I think the below map doesn't have to be exactly storetd.  Its deletes
-      // they don't have to go in in exact sorted order (we don't have to worry
+    // We used to have a getKeys method that purportedly only got the keys and
+    // not the keys and values.  We now just do getFull.  For memcache values,
+    // shouldn't matter if we get key and value since it'll be the entry that
+    // is in memcache.  For the keyvalues from storefile, could be saving if
+    // we only returned key component. TODO.
+    List<KeyValue> keys = get(row, column, ts, versions);
+    if (keys != null && keys.size() > 0) {
+      // I think the below edits don't have to be storted.  Its deletes.
+      // hey don't have to go in in exact sorted order (we don't have to worry
       // about the meta or root sort comparator here).
-      TreeMap<HStoreKey, byte []> edits = new TreeMap<HStoreKey, byte []>(
-        new HStoreKey.HStoreKeyComparator());
-      for (HStoreKey key: keys) {
-        edits.put(key, HLogEdit.DELETED_BYTES);
+      List<KeyValue> edits = new ArrayList<KeyValue>();
+      for (KeyValue key: keys) {
+        edits.add(key.cloneDelete());
       }
       update(edits);
     }
@@ -1610,16 +1697,14 @@
     checkRow(row);
     Integer lid = getLock(lockid, row);
     try {
-      HStoreKey origin;
+      NavigableSet<byte []> columns = null;
       if (column != null) {
-        origin = new HStoreKey(row, column, timestamp);
-      } else {
-        origin = new HStoreKey(row, timestamp);
+        columns = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
+        columns.add(column);
       }
-      return !getKeys(origin, 1).isEmpty();
+      return !getFull(row, columns, timestamp, 1, lid).isEmpty();
     } finally {
-      if (lockid == null)
-        releaseRowLock(lid);
+      if (lockid == null) releaseRowLock(lid);
     }
   }
 
@@ -1631,44 +1716,15 @@
       throw new IOException("region is read only");
     }
   }
-  
-  /**
-   * Private implementation.
-   * 
-   * localput() is used for both puts and deletes. We just place the values
-   * into a per-row pending area, until a commit() or abort() call is received.
-   * (Or until the user's write-lock expires.)
-   * 
-   * @param lockid
-   * @param key 
-   * @param val Value to enter into cell
-   * @throws IOException
-   */
-  @SuppressWarnings("unchecked")
-  private void localput(final Integer lockid, final HStoreKey key,
-      final byte [] val)
-  throws IOException {
-    checkColumn(key.getColumn());
-    checkReadOnly();
-    TreeMap<HStoreKey, byte []> targets = this.targetColumns.get(lockid);
-    if (targets == null) {
-      // I think the below map doesn't have to be exactly storetd.  Its deletes
-      // they don't have to go in in exact sorted order (we don't have to worry
-      // about the meta or root sort comparator here).
-      targets = new TreeMap<HStoreKey, byte []>(new HStoreKey.HStoreKeyComparator());
-      this.targetColumns.put(lockid, targets);
-    }
-    targets.put(key, val);
-  }
-  
+
   /** 
    * Add updates first to the hlog and then add values to memcache.
    * Warning: Assumption is caller has lock on passed in row.
-   * @param updatesByColumn Cell updates by column
+   * @param edits Cell updates by column
    * @throws IOException
    */
-  private void update(final TreeMap<HStoreKey, byte []> updatesByColumn) throws IOException {
-    this.update(updatesByColumn, true);
+  private void update(final List<KeyValue> edits) throws IOException {
+    this.update(edits, true);
   }
 
   /** 
@@ -1678,26 +1734,23 @@
    * @param updatesByColumn Cell updates by column
    * @throws IOException
    */
-  private void update(final TreeMap<HStoreKey, byte []> updatesByColumn,
-    boolean writeToWAL)
+  private void update(final List<KeyValue> edits, boolean writeToWAL)
   throws IOException {
-    if (updatesByColumn == null || updatesByColumn.size() <= 0) {
+    if (edits == null || edits.isEmpty()) {
       return;
     }
-    checkReadOnly();
     boolean flush = false;
     this.updatesLock.readLock().lock();
     try {
       if (writeToWAL) {
         this.log.append(regionInfo.getRegionName(),
-          regionInfo.getTableDesc().getName(), updatesByColumn,
+          regionInfo.getTableDesc().getName(), edits,
           (regionInfo.isMetaRegion() || regionInfo.isRootRegion()));
       }
       long size = 0;
-      for (Map.Entry<HStoreKey, byte[]> e: updatesByColumn.entrySet()) {
-        HStoreKey key = e.getKey();
-        size = this.memcacheSize.addAndGet(
-          getStore(key.getColumn()).add(key, e.getValue()));
+      for (KeyValue kv: edits) {
+        // TODO: Fix -- do I have to do a getColumn here?
+        size = this.memcacheSize.addAndGet(getStore(kv.getColumn()).add(kv));
       }
       flush = isFlushSize(size);
     } finally {
@@ -1757,9 +1810,9 @@
    * TODO: Make this lookup faster.
    */
   public Store getStore(final byte [] column) {
-    return this.stores.get(HStoreKey.getFamilyMapKey(column)); 
+    return this.stores.get(column); 
   }
-  
+
   //////////////////////////////////////////////////////////////////////////////
   // Support code
   //////////////////////////////////////////////////////////////////////////////
@@ -1780,21 +1833,14 @@
    * @param columnName
    * @throws NoSuchColumnFamilyException
    */
-  private void checkColumn(final byte [] columnName)
-  throws NoSuchColumnFamilyException, ColumnNameParseException {
-    if (columnName == null) {
+  private void checkColumn(final byte [] column)
+  throws NoSuchColumnFamilyException {
+    if (column == null) {
       return;
     }
-
-    int index = HStoreKey.getFamilyDelimiterIndex(columnName);
-    if (index <= 0) {
-      throw new ColumnNameParseException(Bytes.toString(columnName) +
-        " is missing column family delimiter '" +
-        HStoreKey.COLUMN_FAMILY_DELIMITER + "'");
-    }
-    if (!regionInfo.getTableDesc().hasFamily(columnName, index)) {
+    if (!regionInfo.getTableDesc().hasFamily(column)) {
       throw new NoSuchColumnFamilyException("Column family on " +
-        Bytes.toString(columnName) + " does not exist in region " + this
+        Bytes.toString(column) + " does not exist in region " + this
           + " in table " + regionInfo.getTableDesc());
     }
   }
@@ -1893,7 +1939,7 @@
     if (lockid == null) {
       lid = obtainRowLock(row);
     } else {
-      if(!isRowLocked(lockid)) {
+      if (!isRowLocked(lockid)) {
         throw new IOException("Invalid row lock");
       }
       lid = lockid;
@@ -1939,31 +1985,26 @@
    */
   private class HScanner implements InternalScanner {
     private InternalScanner[] scanners;
-    private TreeMap<byte [], Cell>[] resultSets;
-    private HStoreKey[] keys;
+    private List<KeyValue> [] resultSets;
     private RowFilterInterface filter;
-    private HStoreKey.StoreKeyComparator comparator;
 
     /** Create an HScanner with a handle on many HStores. */
     @SuppressWarnings("unchecked")
-    HScanner(byte [][] cols, byte [] firstRow, long timestamp, Store [] stores,
-      RowFilterInterface filter)
+    HScanner(final NavigableSet<byte []> columns, byte [] firstRow,
+      long timestamp, final Store [] stores, final RowFilterInterface filter)
     throws IOException {
       this.filter = filter;
       this.scanners = new InternalScanner[stores.length];
       try {
-        this.comparator = HStoreKey.getRawComparator(regionInfo);
-        
         for (int i = 0; i < stores.length; i++) {
           // Only pass relevant columns to each store
-          List<byte[]> columns = new ArrayList<byte[]>();
-          for (int j = 0; j < cols.length; j++) {
-            if (Bytes.equals(HStoreKey.getFamily(cols[j]),
-                stores[i].getFamily().getName())) {
-              columns.add(cols[j]);
+          NavigableSet<byte[]> columnSubset =
+            new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+          for (byte [] c: columns) {
+            if (KeyValue.FAMILY_COMPARATOR.compare(stores[i].storeName, c) == 0) {
+              columnSubset.add(c);
             }
           }
-
           RowFilterInterface f = filter;
           if (f != null) {
             // Need to replicate filters.
@@ -1971,12 +2012,11 @@
             // one shared across many rows. See HADOOP-2467.
             f = WritableUtils.clone(filter, conf);
           }
-          scanners[i] = stores[i].getScanner(timestamp,
-              columns.toArray(new byte[columns.size()][]), firstRow, f);
+          scanners[i] = stores[i].getScanner(timestamp, columnSubset, firstRow, f);
         }
       } catch (IOException e) {
         for (int i = 0; i < this.scanners.length; i++) {
-          if(scanners[i] != null) {
+          if (scanners[i] != null) {
             closeScanner(i);
           }
         }
@@ -1985,12 +2025,10 @@
 
       // Advance to the first key in each store.
       // All results will match the required column-set and scanTime.
-      this.resultSets = new TreeMap[scanners.length];
-      this.keys = new HStoreKey[scanners.length];
+      this.resultSets = new List[scanners.length];
       for (int i = 0; i < scanners.length; i++) {
-        keys[i] = new HStoreKey(HConstants.EMPTY_BYTE_ARRAY);
-        resultSets[i] = new TreeMap<byte [], Cell>(Bytes.BYTES_COMPARATOR);
-        if(scanners[i] != null && !scanners[i].next(keys[i], resultSets[i])) {
+        resultSets[i] = new ArrayList<KeyValue>();
+        if(scanners[i] != null && !scanners[i].next(resultSets[i])) {
           closeScanner(i);
         }
       }
@@ -2000,81 +2038,61 @@
       activeScannerCount.incrementAndGet();
     }
 
-    public boolean next(HStoreKey key, SortedMap<byte [], Cell> results)
+    @SuppressWarnings("null")
+    public boolean next(List<KeyValue> results)
     throws IOException {
       boolean moreToFollow = false;
       boolean filtered = false;
-
       do {
-        // Find the lowest-possible key.
-        byte [] chosenRow = null;
+        // Find the lowest key across all stores.
+        KeyValue chosen = null;
         long chosenTimestamp = -1;
-        for (int i = 0; i < this.keys.length; i++) {
-          if (scanners[i] != null &&
-             (chosenRow == null ||
-               (this.comparator.compareRows(this.keys[i].getRow(), chosenRow) < 0) ||
-               ((this.comparator.compareRows(this.keys[i].getRow(), chosenRow) == 0) &&
-                      (keys[i].getTimestamp() > chosenTimestamp)))) {
-            chosenRow = keys[i].getRow();
-            chosenTimestamp = keys[i].getTimestamp();
+        for (int i = 0; i < this.scanners.length; i++) {
+          if (this.resultSets[i] == null || this.resultSets[i].isEmpty()) {
+            continue;
+          }
+          KeyValue kv = this.resultSets[i].get(0);
+          if (chosen == null ||
+               (comparator.compareRows(kv, chosen) < 0) ||
+               ((comparator.compareRows(kv, chosen) == 0) &&
+                 (kv.getTimestamp() > chosenTimestamp))) {
+            chosen = kv;
+            chosenTimestamp = chosen.getTimestamp();
           }
         }
 
-        // Store the key and results for each sub-scanner. Merge them as
-        // appropriate.
+        // Store results from each sub-scanner.
         if (chosenTimestamp >= 0) {
-          // Here we are setting the passed in key with current row+timestamp
-          key.setRow(chosenRow);
-          key.setVersion(chosenTimestamp);
-          key.setColumn(HConstants.EMPTY_BYTE_ARRAY);
-
           for (int i = 0; i < scanners.length; i++) {
-            if (scanners[i] != null &&
-              this.comparator.compareRows(this.keys[i].getRow(), chosenRow) == 0) {
-              // NOTE: We used to do results.putAll(resultSets[i]);
-              // but this had the effect of overwriting newer
-              // values with older ones. So now we only insert
-              // a result if the map does not contain the key.
-              for (Map.Entry<byte [], Cell> e : resultSets[i].entrySet()) {
-                if (!results.containsKey(e.getKey())) {
-                  results.put(e.getKey(), e.getValue());
-                }
-              }
+            if (this.resultSets[i] == null || this.resultSets[i].isEmpty()) {
+              continue;
+            }
+            KeyValue kv = this.resultSets[i].get(0);
+            if (comparator.compareRows(kv, chosen) == 0) {
+              results.addAll(this.resultSets[i]);
               resultSets[i].clear();
-              if (!scanners[i].next(keys[i], resultSets[i])) {
+              if (!scanners[i].next(resultSets[i])) {
                 closeScanner(i);
               }
             }
           }
         }
 
-        for (int i = 0; i < scanners.length; i++) {
-          // If the current scanner is non-null AND has a lower-or-equal
-          // row label, then its timestamp is bad. We need to advance it.
-          while ((scanners[i] != null) &&
-              (this.comparator.compareRows(this.keys[i].getRow(), chosenRow) <= 0)) {
-            resultSets[i].clear();
-            if (!scanners[i].next(keys[i], resultSets[i])) {
-              closeScanner(i);
-            }
-          }
-        }
-
         moreToFollow = chosenTimestamp >= 0;
         if (results == null || results.size() <= 0) {
           // If we got no results, then there is no more to follow.
           moreToFollow = false;
         }
-        
+
         filtered = filter == null ? false : filter.filterRow(results);
-        
         if (filter != null && filter.filterAllRemaining()) {
           moreToFollow = false;
         }
         
         if (moreToFollow) {
           if (filter != null) {
-            filter.rowProcessed(filtered, key.getRow());
+            filter.rowProcessed(filtered, chosen.getBuffer(), chosen.getRowOffset(),
+              chosen.getRowLength());
           }
           if (filtered) {
             results.clear();
@@ -2108,9 +2126,6 @@
         if (resultSets != null) {
           resultSets[i] = null;
         }
-        if (keys != null) {
-          keys[i] = null;
-        }
       }
     }
 
@@ -2222,7 +2237,6 @@
    *
    * @throws IOException
    */
-  @SuppressWarnings("unchecked")
   public static void addRegionToMETA(HRegion meta, HRegion r) 
   throws IOException {
     meta.checkResources();
@@ -2230,11 +2244,9 @@
     byte [] row = r.getRegionName();
     Integer lid = meta.obtainRowLock(row);
     try {
-      HStoreKey key = new HStoreKey(row, COL_REGIONINFO,
-        System.currentTimeMillis());
-      TreeMap<HStoreKey, byte[]> edits = new TreeMap<HStoreKey, byte[]>(
-        HStoreKey.getWritableComparator(r.getRegionInfo()));
-      edits.put(key, Writables.getBytes(r.getRegionInfo()));
+      List<KeyValue> edits = new ArrayList<KeyValue>();
+      edits.add(new KeyValue(row, COL_REGIONINFO, System.currentTimeMillis(),
+        Writables.getBytes(r.getRegionInfo())));
       meta.update(edits);
     } finally {
       meta.releaseRowLock(lid);
@@ -2389,7 +2401,6 @@
    */
   public static HRegion mergeAdjacent(final HRegion srcA, final HRegion srcB)
   throws IOException {
-
     HRegion a = srcA;
     HRegion b = srcB;
 
@@ -2406,7 +2417,7 @@
       b = srcA;
     }
 
-    if (!HStoreKey.equalsTwoRowKeys(a.getEndKey(), b.getStartKey())) {
+    if (!(Bytes.compareTo(a.getEndKey(), b.getStartKey()) == 0)) {
       throw new IOException("Cannot merge non-adjacent regions");
     }
     return merge(a, b);
@@ -2450,17 +2461,25 @@
     HTableDescriptor tabledesc = a.getTableDesc();
     HLog log = a.getLog();
     Path basedir = a.getBaseDir();
-    final byte [] startKey = HStoreKey.equalsTwoRowKeys(a.getStartKey(),
-        EMPTY_BYTE_ARRAY) ||
-      HStoreKey.equalsTwoRowKeys(b.getStartKey(), EMPTY_BYTE_ARRAY)?
-        EMPTY_BYTE_ARRAY: Bytes.compareTo(a.getStartKey(), 
-          b.getStartKey()) <= 0?
+    // Presume both are of same region type -- i.e. both user or catalog 
+    // table regions.  This way can use comparator.
+    final byte [] startKey = a.comparator.matchingRows(a.getStartKey(), 0,
+          a.getStartKey().length,
+        EMPTY_BYTE_ARRAY, 0, EMPTY_BYTE_ARRAY.length) ||
+      b.comparator.matchingRows(b.getStartKey(), 0, b.getStartKey().length,
+        EMPTY_BYTE_ARRAY, 0, EMPTY_BYTE_ARRAY.length)?
+        EMPTY_BYTE_ARRAY:
+          a.comparator.compareRows(a.getStartKey(), 0, a.getStartKey().length, 
+          b.getStartKey(), 0, b.getStartKey().length) <= 0?
         a.getStartKey(): b.getStartKey();
-    final byte [] endKey = HStoreKey.equalsTwoRowKeys(a.getEndKey(),
-        EMPTY_BYTE_ARRAY) ||
-      HStoreKey.equalsTwoRowKeys(b.getEndKey(), EMPTY_BYTE_ARRAY)?
+    final byte [] endKey = a.comparator.matchingRows(a.getEndKey(), 0,
+        a.getEndKey().length, EMPTY_BYTE_ARRAY, 0, EMPTY_BYTE_ARRAY.length) ||
+      a.comparator.matchingRows(b.getEndKey(), 0, b.getEndKey().length,
+        EMPTY_BYTE_ARRAY, 0, EMPTY_BYTE_ARRAY.length)?
         EMPTY_BYTE_ARRAY:
-        Bytes.compareTo(a.getEndKey(), b.getEndKey()) <= 0? b.getEndKey(): a.getEndKey();
+        a.comparator.compareRows(a.getEndKey(), 0, a.getEndKey().length,
+            b.getEndKey(), 0, b.getEndKey().length) <= 0?
+                b.getEndKey(): a.getEndKey();
 
     HRegionInfo newRegionInfo = new HRegionInfo(tabledesc, startKey, endKey);
     LOG.info("Creating new region " + newRegionInfo.toString());
@@ -2591,48 +2610,54 @@
     }
   }
 
-  
-  public long incrementColumnValue(byte[] row, byte[] column, long amount) throws IOException {
+  public long incrementColumnValue(byte[] row, byte[] column, long amount)
+  throws IOException {
     checkRow(row);
     checkColumn(column);
     
     Integer lid = obtainRowLock(row);
     splitsAndClosesLock.readLock().lock();
     try {
-      HStoreKey hsk = new HStoreKey(row, column);
+      KeyValue kv = new KeyValue(row, column);
       long ts = System.currentTimeMillis();
       byte [] value = null;
 
       Store store = getStore(column);
 
-      List<Cell> c;
+      List<KeyValue> c;
       // Try the memcache first.
       store.lock.readLock().lock();
       try {
-        c = store.memcache.get(hsk, 1);
+        c = store.memcache.get(kv, 1);
       } finally {
         store.lock.readLock().unlock();
       }
       // Pick the latest value out of List<Cell> c:
       if (c.size() >= 1) {
         // Use the memcache timestamp value.
-        LOG.debug("Overwriting the memcache value for " + Bytes.toString(row) + "/" + Bytes.toString(column));
+        LOG.debug("Overwriting the memcache value for " + Bytes.toString(row) +
+          "/" + Bytes.toString(column));
         ts = c.get(0).getTimestamp();
         value = c.get(0).getValue();
       }
 
       if (value == null) {
         // Check the store (including disk) for the previous value.
-        Cell[] cell = store.get(hsk, 1);
-        if (cell != null && cell.length >= 1) {
-          LOG.debug("Using HFile previous value for " + Bytes.toString(row) + "/" + Bytes.toString(column));
-          value = cell[0].getValue();
+        c = store.get(kv, 1);
+        if (c != null && c.size() == 1) {
+          LOG.debug("Using HFile previous value for " + Bytes.toString(row) +
+            "/" + Bytes.toString(column));
+          value = c.get(0).getValue();
+        } else if (c != null && c.size() > 1) {
+          throw new DoNotRetryIOException("more than 1 value returned in " +
+            "incrementColumnValue from Store");
         }
       }
       
       if (value == null) {
         // Doesn't exist
-        LOG.debug("Creating new counter value for " + Bytes.toString(row) + "/"+ Bytes.toString(column));
+        LOG.debug("Creating new counter value for " + Bytes.toString(row) +
+          "/"+ Bytes.toString(column));
         value = Bytes.toBytes(amount);
       } else {
         value = incrementBytes(value, amount);
@@ -2652,10 +2677,12 @@
     // Hopefully this doesn't happen too often.
     if (value.length < Bytes.SIZEOF_LONG) {
       byte [] newvalue = new byte[Bytes.SIZEOF_LONG];
-      System.arraycopy(value, 0, newvalue, newvalue.length - value.length, value.length);
+      System.arraycopy(value, 0, newvalue, newvalue.length - value.length,
+        value.length);
       value = newvalue;
     } else if (value.length > Bytes.SIZEOF_LONG) {
-      throw new DoNotRetryIOException("Increment Bytes - value too big: " + value.length);
+      throw new DoNotRetryIOException("Increment Bytes - value too big: " +
+        value.length);
     }
     return binaryIncrement(value, amount);
   }
@@ -2675,4 +2702,4 @@
     }
     return value;
   }
-}
+}
\ No newline at end of file

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=764289&r1=764288&r2=764289&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Sun Apr 12 10:39:55 2009
@@ -37,6 +37,7 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableSet;
 import java.util.Random;
 import java.util.Set;
 import java.util.SortedMap;
@@ -58,7 +59,6 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Chore;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HMsg;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -66,8 +66,8 @@
 import org.apache.hadoop.hbase.HServerAddress;
 import org.apache.hadoop.hbase.HServerInfo;
 import org.apache.hadoop.hbase.HServerLoad;
-import org.apache.hadoop.hbase.HStoreKey;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.LeaseListener;
 import org.apache.hadoop.hbase.Leases;
 import org.apache.hadoop.hbase.LocalHBaseCluster;
@@ -76,13 +76,11 @@
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.UnknownRowLockException;
 import org.apache.hadoop.hbase.UnknownScannerException;
-import org.apache.hadoop.hbase.ValueOverMaxLengthException;
 import org.apache.hadoop.hbase.HMsg.Type;
 import org.apache.hadoop.hbase.Leases.LeaseStillHeldException;
 import org.apache.hadoop.hbase.client.ServerConnection;
 import org.apache.hadoop.hbase.client.ServerConnectionManager;
 import org.apache.hadoop.hbase.filter.RowFilterInterface;
-import org.apache.hadoop.hbase.io.BatchOperation;
 import org.apache.hadoop.hbase.io.BatchUpdate;
 import org.apache.hadoop.hbase.io.Cell;
 import org.apache.hadoop.hbase.io.HbaseMapWritable;
@@ -991,7 +989,7 @@
         memcacheSize += r.memcacheSize.get();
         synchronized (r.stores) {
           stores += r.stores.size();
-          for(Map.Entry<Integer, Store> ee: r.stores.entrySet()) {
+          for(Map.Entry<byte [], Store> ee: r.stores.entrySet()) {
             Store store = ee.getValue(); 
             storefiles += store.getStorefilesCount();
             try {
@@ -1573,13 +1571,15 @@
     return getRegion(regionName).getRegionInfo();
   }
 
-  public Cell[] get(final byte [] regionName, final byte [] row,
+  public Cell [] get(final byte [] regionName, final byte [] row,
     final byte [] column, final long timestamp, final int numVersions) 
   throws IOException {
     checkOpen();
     requestCount.incrementAndGet();
     try {
-      return getRegion(regionName).get(row, column, timestamp, numVersions);
+      List<KeyValue> results =
+        getRegion(regionName).get(row, column, timestamp, numVersions);
+      return Cell.createSingleCellArray(results);
     } catch (Throwable t) {
       throw convertThrowableToIOE(cleanup(t));
     }
@@ -1593,16 +1593,14 @@
     requestCount.incrementAndGet();
     try {
       // convert the columns array into a set so it's easy to check later.
-      Set<byte []> columnSet = null;
+      NavigableSet<byte []> columnSet = null;
       if (columns != null) {
         columnSet = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
         columnSet.addAll(Arrays.asList(columns));
       }
-      
       HRegion region = getRegion(regionName);
       HbaseMapWritable<byte [], Cell> result =
-        region.getFull(row, columnSet, 
-          ts, numVersions, getLockFromId(lockId));
+        region.getFull(row, columnSet, ts, numVersions, getLockFromId(lockId));
       if (result == null || result.isEmpty())
         return null;
       return new RowResult(row, result);
@@ -1632,9 +1630,9 @@
     return rrs.length == 0 ? null : rrs[0];
   }
 
-  public RowResult[] next(final long scannerId, int nbRows) throws IOException {
+  public RowResult [] next(final long scannerId, int nbRows) throws IOException {
     checkOpen();
-    ArrayList<RowResult> resultSets = new ArrayList<RowResult>();
+    List<List<KeyValue>> results = new ArrayList<List<KeyValue>>();
     try {
       String scannerName = String.valueOf(scannerId);
       InternalScanner s = scanners.get(scannerName);
@@ -1642,21 +1640,19 @@
         throw new UnknownScannerException("Name: " + scannerName);
       }
       this.leases.renewLease(scannerName);
-      for(int i = 0; i < nbRows; i++) {
+      for (int i = 0; i < nbRows; i++) {
         requestCount.incrementAndGet();
         // Collect values to be returned here
-        HbaseMapWritable<byte [], Cell> values
-          = new HbaseMapWritable<byte [], Cell>();
-        HStoreKey key = new HStoreKey();
-        while (s.next(key, values)) {
-          if (values.size() > 0) {
+        List<KeyValue> values = new ArrayList<KeyValue>();
+        while (s.next(values)) {
+          if (!values.isEmpty()) {
             // Row has something in it. Return the value.
-            resultSets.add(new RowResult(key.getRow(), values));
+            results.add(values);
             break;
           }
         }
       }
-      return resultSets.toArray(new RowResult[resultSets.size()]);
+      return RowResult.createRowResultArray(results);
     } catch (Throwable t) {
       throw convertThrowableToIOE(cleanup(t));
     }
@@ -1670,7 +1666,6 @@
     checkOpen();
     this.requestCount.incrementAndGet();
     HRegion region = getRegion(regionName);
-    validateValuesLength(b, region);
     try {
       cacheFlusher.reclaimMemcacheMemory();
       region.batchUpdate(b, getLockFromId(b.getRowLock()));
@@ -1689,7 +1684,6 @@
       Integer[] locks = new Integer[b.length];
       for (i = 0; i < b.length; i++) {
         this.requestCount.incrementAndGet();
-        validateValuesLength(b[i], region);
         locks[i] = getLockFromId(b[i].getRowLock());
         region.batchUpdate(b[i], locks[i]);
       }
@@ -1711,7 +1705,6 @@
     checkOpen();
     this.requestCount.incrementAndGet();
     HRegion region = getRegion(regionName);
-    validateValuesLength(b, region);
     try {
       cacheFlusher.reclaimMemcacheMemory();
       return region.checkAndSave(b,
@@ -1720,34 +1713,7 @@
       throw convertThrowableToIOE(cleanup(t));
     }
   }
-  
-  
-  /**
-   * Utility method to verify values length
-   * @param batchUpdate The update to verify
-   * @throws IOException Thrown if a value is too long
-   */
-  private void validateValuesLength(BatchUpdate batchUpdate, 
-      HRegion region) throws IOException {
-    HTableDescriptor desc = region.getTableDesc();
-    for (Iterator<BatchOperation> iter = 
-      batchUpdate.iterator(); iter.hasNext();) {
-      BatchOperation operation = iter.next();
-      if (operation.getValue() != null) {
-        HColumnDescriptor fam = 
-          desc.getFamily(HStoreKey.getFamily(operation.getColumn()));
-        if (fam != null) {
-          int maxLength = fam.getMaxValueLength();
-          if (operation.getValue().length > maxLength) {
-            throw new ValueOverMaxLengthException("Value in column "
-                + Bytes.toString(operation.getColumn()) + " is too long. "
-                + operation.getValue().length + " instead of " + maxLength);
-          }
-        }
-      }
-    }
-  }
-  
+
   //
   // remote scanner interface
   //
@@ -2132,8 +2098,7 @@
     HRegion region = null;
     this.lock.readLock().lock();
     try {
-      Integer key = Integer.valueOf(Bytes.hashCode(regionName));
-      region = onlineRegions.get(key);
+      region = onlineRegions.get(Integer.valueOf(Bytes.hashCode(regionName)));
       if (region == null) {
         throw new NotServingRegionException(regionName);
       }

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java?rev=764289&r1=764288&r2=764289&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java Sun Apr 12 10:39:55 2009
@@ -21,9 +21,9 @@
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.SortedMap;
-import org.apache.hadoop.hbase.HStoreKey;
-import org.apache.hadoop.hbase.io.Cell;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue;
 
 /**
  * Internal scanners differ from client-side scanners in that they operate on
@@ -44,13 +44,11 @@
    * Grab the next row's worth of values. The scanner will return the most
    * recent data value for each row that is not newer than the target time
    * passed when the scanner was created.
-   * @param key will contain the row and timestamp upon return
-   * @param results will contain an entry for each column family member and its
-   * value
+   * @param results
    * @return true if data was returned
    * @throws IOException
    */
-  public boolean next(HStoreKey key, SortedMap<byte [], Cell> results)
+  public boolean next(List<KeyValue> results)
   throws IOException;
   
   /**