You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2009/10/16 01:57:30 UTC

svn commit: r825707 [2/3] - in /hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase: ./ client/ regionserver/

Added: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/TestFromClientSide.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/TestFromClientSide.java?rev=825707&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/TestFromClientSide.java (added)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/TestFromClientSide.java Thu Oct 15 23:57:29 2009
@@ -0,0 +1,3478 @@
+/**
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableExistsException;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.PrefixFilter;
+import org.apache.hadoop.hbase.filter.QualifierFilter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.filter.WhileMatchFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Run tests that use the HBase clients; {@link HTable} and {@link HTablePool}.
+ * Sets up the HBase mini cluster once at start and runs through all client tests.
+ * Each creates a table named for the method and does its stuff against that.
+ */
+public class TestFromClientSide {
+  final Log LOG = LogFactory.getLog(getClass());
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static byte [] ROW = Bytes.toBytes("testRow");
+  private static byte [] FAMILY = Bytes.toBytes("testFamily");
+  private static byte [] QUALIFIER = Bytes.toBytes("testQualifier");
+  private static byte [] VALUE = Bytes.toBytes("testValue");
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.startMiniCluster(3);
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @Before
+  public void setUp() throws Exception {
+    // Nothing to do.
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @After
+  public void tearDown() throws Exception {
+    // Nothing to do.
+  }
+
+  /**
+   * Test from client side of an involved filter against a multi family that
+   * involves deletes.
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testWeirdCacheBehaviour() throws Exception {
+    byte [] TABLE = Bytes.toBytes("testWeirdCacheBehaviour");
+    byte [][] FAMILIES = new byte[][] { Bytes.toBytes("trans-blob"),
+        Bytes.toBytes("trans-type"), Bytes.toBytes("trans-date"),
+        Bytes.toBytes("trans-tags"), Bytes.toBytes("trans-group") };
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES);
+    String value = "this is the value";
+    String value2 = "this is some other value";
+    String keyPrefix1 = UUID.randomUUID().toString();
+    String keyPrefix2 = UUID.randomUUID().toString();
+    String keyPrefix3 = UUID.randomUUID().toString();
+    putRows(ht, 3, value, keyPrefix1);
+    putRows(ht, 3, value, keyPrefix2);
+    putRows(ht, 3, value, keyPrefix3);
+    ht.flushCommits();
+    putRows(ht, 3, value2, keyPrefix1);
+    putRows(ht, 3, value2, keyPrefix2);
+    putRows(ht, 3, value2, keyPrefix3);
+    HTable table = new HTable(TEST_UTIL.getConfiguration(),
+      Bytes.toBytes("testWeirdCacheBehaviour"));
+    System.out.println("Checking values for key: " + keyPrefix1);
+    assertEquals("Got back incorrect number of rows from scan", 3,
+        getNumberOfRows(keyPrefix1, value2, table));
+    System.out.println("Checking values for key: " + keyPrefix2);
+    assertEquals("Got back incorrect number of rows from scan", 3,
+        getNumberOfRows(keyPrefix2, value2, table));
+    System.out.println("Checking values for key: " + keyPrefix3);
+    assertEquals("Got back incorrect number of rows from scan", 3,
+        getNumberOfRows(keyPrefix3, value2, table));
+    deleteColumns(ht, value2, keyPrefix1);
+    deleteColumns(ht, value2, keyPrefix2);
+    deleteColumns(ht, value2, keyPrefix3);
+    System.out.println("Starting important checks.....");
+    assertEquals("Got back incorrect number of rows from scan: " + keyPrefix1,
+      0, getNumberOfRows(keyPrefix1, value2, table));
+    assertEquals("Got back incorrect number of rows from scan: " + keyPrefix2,
+      0, getNumberOfRows(keyPrefix2, value2, table));
+    assertEquals("Got back incorrect number of rows from scan: " + keyPrefix3,
+      0, getNumberOfRows(keyPrefix3, value2, table));
+    ht.setScannerCaching(0);
+    assertEquals("Got back incorrect number of rows from scan", 0,
+      getNumberOfRows(keyPrefix1, value2, table)); ht.setScannerCaching(100);
+    assertEquals("Got back incorrect number of rows from scan", 0,
+      getNumberOfRows(keyPrefix2, value2, table));
+  }
+
+  private void deleteColumns(HTable ht, String value, String keyPrefix)
+  throws IOException {
+    ResultScanner scanner = buildScanner(keyPrefix, value, ht);
+    Iterator<Result> it = scanner.iterator();
+    int count = 0;
+    while (it.hasNext()) {
+      Result result = it.next();
+      Delete delete = new Delete(result.getRow());
+      delete.deleteColumn(Bytes.toBytes("trans-tags"), Bytes.toBytes("qual2"));
+      ht.delete(delete);
+      count++;
+    }
+    assertEquals("Did not perform correct number of deletes", 3, count);
+  }
+
+  private int getNumberOfRows(String keyPrefix, String value, HTable ht)
+      throws Exception {
+    ResultScanner resultScanner = buildScanner(keyPrefix, value, ht);
+    Iterator<Result> scanner = resultScanner.iterator();
+    int numberOfResults = 0;
+    while (scanner.hasNext()) {
+      Result result = scanner.next();
+      System.out.println("Got back key: " + Bytes.toString(result.getRow()));
+      for (KeyValue kv : result.raw()) {
+        System.out.println("kv=" + kv.toString() + ", "
+            + Bytes.toString(kv.getValue()));
+      }
+      numberOfResults++;
+    }
+    return numberOfResults;
+  }
+
+  private ResultScanner buildScanner(String keyPrefix, String value, HTable ht)
+      throws IOException {
+    // OurFilterList allFilters = new OurFilterList();
+    FilterList allFilters = new FilterList(/* FilterList.Operator.MUST_PASS_ALL */);
+    allFilters.addFilter(new PrefixFilter(Bytes.toBytes(keyPrefix)));
+    SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes
+        .toBytes("trans-tags"), Bytes.toBytes("qual2"), CompareOp.EQUAL, Bytes
+        .toBytes(value));
+    filter.setFilterIfMissing(true);
+    allFilters.addFilter(filter);
+
+    // allFilters.addFilter(new
+    // RowExcludingSingleColumnValueFilter(Bytes.toBytes("trans-tags"),
+    // Bytes.toBytes("qual2"), CompareOp.EQUAL, Bytes.toBytes(value)));
+
+    Scan scan = new Scan();
+    scan.addFamily(Bytes.toBytes("trans-blob"));
+    scan.addFamily(Bytes.toBytes("trans-type"));
+    scan.addFamily(Bytes.toBytes("trans-date"));
+    scan.addFamily(Bytes.toBytes("trans-tags"));
+    scan.addFamily(Bytes.toBytes("trans-group"));
+    scan.setFilter(allFilters);
+
+    return ht.getScanner(scan);
+  }
+
+  private void putRows(HTable ht, int numRows, String value, String key)
+      throws IOException {
+    for (int i = 0; i < numRows; i++) {
+      String row = key + "_" + UUID.randomUUID().toString();
+      System.out.println(String.format("Saving row: %s, with value %s", row,
+          value));
+      Put put = new Put(Bytes.toBytes(row));
+      put.add(Bytes.toBytes("trans-blob"), null, Bytes
+          .toBytes("value for blob"));
+      put.add(Bytes.toBytes("trans-type"), null, Bytes.toBytes("statement"));
+      put.add(Bytes.toBytes("trans-date"), null, Bytes
+          .toBytes("20090921010101999"));
+      put.add(Bytes.toBytes("trans-tags"), Bytes.toBytes("qual2"), Bytes
+          .toBytes(value));
+      put.add(Bytes.toBytes("trans-group"), null, Bytes
+          .toBytes("adhocTransactionGroupId"));
+      ht.put(put);
+    }
+  }
+
+  /**
+   * Test filters when multiple regions.  It does counts.  Needs eye-balling of
+   * logs to ensure that we're not scanning more regions that we're supposed to.
+   * Related to the TestFilterAcrossRegions over in the o.a.h.h.filter package.
+   * @throws IOException
+   */
+  @Test
+  public void testFilterAcrossMutlipleRegions() throws IOException {
+    byte [] name = Bytes.toBytes("testFilterAcrossMutlipleRegions");
+    HTable t = TEST_UTIL.createTable(name, FAMILY);
+    int rowCount = TEST_UTIL.loadTable(t, FAMILY);
+    assertRowCount(t, rowCount);
+    // Split the table.  Should split on a reasonable key; 'lqj'
+    Map<HRegionInfo, HServerAddress> regions  = splitTable(t);
+    assertRowCount(t, rowCount);
+    // Get end key of first region.
+    byte [] endKey = regions.keySet().iterator().next().getEndKey();
+    // Count rows with a filter that stops us before passed 'endKey'.
+    // Should be count of rows in first region.
+    int endKeyCount = countRows(t, createScanWithRowFilter(endKey));
+    assertTrue(endKeyCount < rowCount);
+
+    // How do I know I did not got to second region?  Thats tough.  Can't really
+    // do that in client-side region test.  I verified by tracing in debugger.
+    // I changed the messages that come out when set to DEBUG so should see
+    // when scanner is done. Says "Finished with scanning..." with region name.
+    // Check that its finished in right region.
+
+    // New test.  Make it so scan goes into next region by one and then two.
+    // Make sure count comes out right.
+    byte [] key = new byte [] {endKey[0], endKey[1], (byte)(endKey[2] + 1)};
+    int plusOneCount = countRows(t, createScanWithRowFilter(key));
+    assertEquals(endKeyCount + 1, plusOneCount);
+    key = new byte [] {endKey[0], endKey[1], (byte)(endKey[2] + 2)};
+    int plusTwoCount = countRows(t, createScanWithRowFilter(key));
+    assertEquals(endKeyCount + 2, plusTwoCount);
+
+    // New test.  Make it so I scan one less than endkey.
+    key = new byte [] {endKey[0], endKey[1], (byte)(endKey[2] - 1)};
+    int minusOneCount = countRows(t, createScanWithRowFilter(key));
+    assertEquals(endKeyCount - 1, minusOneCount);
+    // For above test... study logs.  Make sure we do "Finished with scanning.."
+    // in first region and that we do not fall into the next region.
+    
+    key = new byte [] {'a', 'a', 'a'};
+    int countBBB = countRows(t,
+      createScanWithRowFilter(key, null, CompareFilter.CompareOp.EQUAL));
+    assertEquals(1, countBBB);
+
+    int countGreater = countRows(t, createScanWithRowFilter(endKey, null,
+      CompareFilter.CompareOp.GREATER_OR_EQUAL));
+    // Because started at start of table.
+    assertEquals(0, countGreater);
+    countGreater = countRows(t, createScanWithRowFilter(endKey, endKey,
+      CompareFilter.CompareOp.GREATER_OR_EQUAL));
+    assertEquals(rowCount - endKeyCount, countGreater);
+  }
+
+  /*
+   * @param key
+   * @return Scan with RowFilter that does LESS than passed key.
+   */
+  private Scan createScanWithRowFilter(final byte [] key) {
+    return createScanWithRowFilter(key, null, CompareFilter.CompareOp.LESS);
+  }
+
+  /*
+   * @param key
+   * @param op
+   * @param startRow
+   * @return Scan with RowFilter that does CompareOp op on passed key.
+   */
+  private Scan createScanWithRowFilter(final byte [] key,
+      final byte [] startRow, CompareFilter.CompareOp op) {
+    // Make sure key is of some substance... non-null and > than first key.
+    assertTrue(key != null && key.length > 0 &&
+      Bytes.BYTES_COMPARATOR.compare(key, new byte [] {'a', 'a', 'a'}) >= 0);
+    LOG.info("Key=" + Bytes.toString(key));
+    Scan s = startRow == null? new Scan(): new Scan(startRow);
+    Filter f = new RowFilter(op, new BinaryComparator(key));
+    f = new WhileMatchFilter(f);
+    s.setFilter(f);
+    return s;
+  }
+
+  /*
+   * @param t
+   * @param s
+   * @return Count of rows in table.
+   * @throws IOException
+   */
+  private int countRows(final HTable t, final Scan s)
+  throws IOException {
+    // Assert all rows in table.
+    ResultScanner scanner = t.getScanner(s);
+    int count = 0;
+    for (Result result: scanner) {
+      count++;
+      assertTrue(result.size() > 0);
+      // LOG.info("Count=" + count + ", row=" + Bytes.toString(result.getRow()));
+    }
+    return count;
+  }
+
+  private void assertRowCount(final HTable t, final int expected)
+  throws IOException {
+    assertEquals(expected, countRows(t, new Scan()));
+  }
+
+  /*
+   * Split table into multiple regions.
+   * @param t Table to split.
+   * @return Map of regions to servers.
+   * @throws IOException
+   */
+  private Map<HRegionInfo, HServerAddress> splitTable(final HTable t)
+  throws IOException {
+    // Split this table in two.
+    HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
+    admin.split(t.getTableName());
+    Map<HRegionInfo, HServerAddress> regions = waitOnSplit(t);
+    assertTrue(regions.size() > 1);
+    return regions;
+  }
+
+  /*
+   * Wait on table split.  May return because we waited long enough on the split
+   * and it didn't happen.  Caller should check.
+   * @param t
+   * @return Map of table regions; caller needs to check table actually split.
+   */
+  private Map<HRegionInfo, HServerAddress> waitOnSplit(final HTable t)
+  throws IOException {
+    Map<HRegionInfo, HServerAddress> regions = t.getRegionsInfo();
+    int originalCount = regions.size();
+    for (int i = 0; i < TEST_UTIL.getConfiguration().getInt("hbase.test.retries", 30); i++) {
+      Thread.currentThread();
+      try {
+        Thread.sleep(TEST_UTIL.getConfiguration().getInt("hbase.server.thread.wakefrequency", 1000));
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+      regions = t.getRegionsInfo();
+      if (regions.size() > originalCount) break;
+    }
+    return regions;
+  }
+
+  @Test
+  public void testSuperSimple() throws Exception {
+    byte [] TABLE = Bytes.toBytes("testSuperSimple");
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILY);
+    Put put = new Put(ROW);
+    put.add(FAMILY, QUALIFIER, VALUE);
+    ht.put(put);
+    Scan scan = new Scan();
+    scan.addColumn(FAMILY, TABLE);
+    ResultScanner scanner = ht.getScanner(scan);
+    Result result = scanner.next();
+    assertTrue("Expected null result", result == null);
+    scanner.close();
+    System.out.println("Done.");
+  }
+
+  @Test
+  public void testFilters() throws Exception {
+    byte [] TABLE = Bytes.toBytes("testFilters");
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILY);
+    byte [][] ROWS = makeN(ROW, 10);
+    byte [][] QUALIFIERS = {
+        Bytes.toBytes("col0-<d2v1>-<d3v2>"), Bytes.toBytes("col1-<d2v1>-<d3v2>"), 
+        Bytes.toBytes("col2-<d2v1>-<d3v2>"), Bytes.toBytes("col3-<d2v1>-<d3v2>"), 
+        Bytes.toBytes("col4-<d2v1>-<d3v2>"), Bytes.toBytes("col5-<d2v1>-<d3v2>"), 
+        Bytes.toBytes("col6-<d2v1>-<d3v2>"), Bytes.toBytes("col7-<d2v1>-<d3v2>"), 
+        Bytes.toBytes("col8-<d2v1>-<d3v2>"), Bytes.toBytes("col9-<d2v1>-<d3v2>")
+    };
+    for(int i=0;i<10;i++) {
+      Put put = new Put(ROWS[i]);
+      put.add(FAMILY, QUALIFIERS[i], VALUE);
+      ht.put(put);
+    }
+    Scan scan = new Scan();
+    scan.addFamily(FAMILY);
+    Filter filter = new QualifierFilter(CompareOp.EQUAL,
+      new RegexStringComparator("col[1-5]"));
+    scan.setFilter(filter);
+    ResultScanner scanner = ht.getScanner(scan);
+    int expectedIndex = 1;
+    for(Result result : ht.getScanner(scan)) {
+      assertEquals(result.size(), 1);
+      assertTrue(Bytes.equals(result.raw()[0].getRow(), ROWS[expectedIndex]));
+      assertTrue(Bytes.equals(result.raw()[0].getQualifier(), 
+          QUALIFIERS[expectedIndex]));
+      expectedIndex++;
+    }
+    assertEquals(expectedIndex, 6);
+    scanner.close();
+  }
+
+  /**
+   * Test simple table and non-existent row cases.
+   */
+  @Test
+  public void testSimpleMissing() throws Exception {
+    byte [] TABLE = Bytes.toBytes("testSimpleMissing");
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILY);
+    byte [][] ROWS = makeN(ROW, 4);
+    
+    // Try to get a row on an empty table
+    Get get = new Get(ROWS[0]);
+    Result result = ht.get(get);
+    assertEmptyResult(result);
+    
+    get = new Get(ROWS[0]);
+    get.addFamily(FAMILY);
+    result = ht.get(get);
+    assertEmptyResult(result);
+    
+    get = new Get(ROWS[0]);
+    get.addColumn(FAMILY, QUALIFIER);
+    result = ht.get(get);
+    assertEmptyResult(result);
+    
+    Scan scan = new Scan();
+    result = getSingleScanResult(ht, scan);
+    assertNullResult(result);
+    
+    
+    scan = new Scan(ROWS[0]);
+    result = getSingleScanResult(ht, scan);
+    assertNullResult(result);
+    
+    scan = new Scan(ROWS[0],ROWS[1]);
+    result = getSingleScanResult(ht, scan);
+    assertNullResult(result);
+    
+    scan = new Scan();
+    scan.addFamily(FAMILY);
+    result = getSingleScanResult(ht, scan);
+    assertNullResult(result);
+    
+    scan = new Scan();
+    scan.addColumn(FAMILY, QUALIFIER);
+    result = getSingleScanResult(ht, scan);
+    assertNullResult(result);
+    
+    // Insert a row
+    
+    Put put = new Put(ROWS[2]);
+    put.add(FAMILY, QUALIFIER, VALUE);
+    ht.put(put);
+    
+    // Try to get empty rows around it
+    
+    get = new Get(ROWS[1]);
+    result = ht.get(get);
+    assertEmptyResult(result);
+    
+    get = new Get(ROWS[0]);
+    get.addFamily(FAMILY);
+    result = ht.get(get);
+    assertEmptyResult(result);
+    
+    get = new Get(ROWS[3]);
+    get.addColumn(FAMILY, QUALIFIER);
+    result = ht.get(get);
+    assertEmptyResult(result);
+    
+    // Try to scan empty rows around it
+    
+    scan = new Scan(ROWS[3]);
+    result = getSingleScanResult(ht, scan);
+    assertNullResult(result);
+    
+    scan = new Scan(ROWS[0],ROWS[2]);
+    result = getSingleScanResult(ht, scan);
+    assertNullResult(result);
+    
+    // Make sure we can actually get the row
+    
+    get = new Get(ROWS[2]);
+    result = ht.get(get);
+    assertSingleResult(result, ROWS[2], FAMILY, QUALIFIER, VALUE);
+    
+    get = new Get(ROWS[2]);
+    get.addFamily(FAMILY);
+    result = ht.get(get);
+    assertSingleResult(result, ROWS[2], FAMILY, QUALIFIER, VALUE);
+    
+    get = new Get(ROWS[2]);
+    get.addColumn(FAMILY, QUALIFIER);
+    result = ht.get(get);
+    assertSingleResult(result, ROWS[2], FAMILY, QUALIFIER, VALUE);
+    
+    // Make sure we can scan the row
+    
+    scan = new Scan();
+    result = getSingleScanResult(ht, scan);
+    assertSingleResult(result, ROWS[2], FAMILY, QUALIFIER, VALUE);
+    
+    scan = new Scan(ROWS[0],ROWS[3]);
+    result = getSingleScanResult(ht, scan);
+    assertSingleResult(result, ROWS[2], FAMILY, QUALIFIER, VALUE);
+    
+    scan = new Scan(ROWS[2],ROWS[3]);
+    result = getSingleScanResult(ht, scan);
+    assertSingleResult(result, ROWS[2], FAMILY, QUALIFIER, VALUE);
+  }
+  
+  /**
+   * Test basic puts, gets, scans, and deletes for a single row
+   * in a multiple family table.
+   */
+  @Test
+  public void testSingleRowMultipleFamily() throws Exception {
+    byte [] TABLE = Bytes.toBytes("testSingleRowMultipleFamily");
+    byte [][] ROWS = makeN(ROW, 3);
+    byte [][] FAMILIES = makeNAscii(FAMILY, 10);
+    byte [][] QUALIFIERS = makeN(QUALIFIER, 10);
+    byte [][] VALUES = makeN(VALUE, 10);
+   
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES);
+    
+    Get get;
+    Scan scan;
+    Delete delete;
+    Put put;
+    Result result;
+    
+    ////////////////////////////////////////////////////////////////////////////
+    // Insert one column to one family
+    ////////////////////////////////////////////////////////////////////////////
+    
+    put = new Put(ROWS[0]);
+    put.add(FAMILIES[4], QUALIFIERS[0], VALUES[0]);
+    ht.put(put);
+    
+    // Get the single column
+    getVerifySingleColumn(ht, ROWS, 0, FAMILIES, 4, QUALIFIERS, 0, VALUES, 0);
+    
+    // Scan the single column
+    scanVerifySingleColumn(ht, ROWS, 0, FAMILIES, 4, QUALIFIERS, 0, VALUES, 0);
+    
+    // Get empty results around inserted column
+    getVerifySingleEmpty(ht, ROWS, 0, FAMILIES, 4, QUALIFIERS, 0);
+    
+    // Scan empty results around inserted column
+    scanVerifySingleEmpty(ht, ROWS, 0, FAMILIES, 4, QUALIFIERS, 0);
+    
+    ////////////////////////////////////////////////////////////////////////////
+    // Flush memstore and run same tests from storefiles
+    ////////////////////////////////////////////////////////////////////////////
+    
+    TEST_UTIL.flush();
+    
+    // Redo get and scan tests from storefile
+    getVerifySingleColumn(ht, ROWS, 0, FAMILIES, 4, QUALIFIERS, 0, VALUES, 0);
+    scanVerifySingleColumn(ht, ROWS, 0, FAMILIES, 4, QUALIFIERS, 0, VALUES, 0);
+    getVerifySingleEmpty(ht, ROWS, 0, FAMILIES, 4, QUALIFIERS, 0);
+    scanVerifySingleEmpty(ht, ROWS, 0, FAMILIES, 4, QUALIFIERS, 0);
+    
+    ////////////////////////////////////////////////////////////////////////////
+    // Now, Test reading from memstore and storefiles at once
+    ////////////////////////////////////////////////////////////////////////////
+    
+    // Insert multiple columns to two other families
+    put = new Put(ROWS[0]);
+    put.add(FAMILIES[2], QUALIFIERS[2], VALUES[2]);
+    put.add(FAMILIES[2], QUALIFIERS[4], VALUES[4]);
+    put.add(FAMILIES[4], QUALIFIERS[4], VALUES[4]);
+    put.add(FAMILIES[6], QUALIFIERS[6], VALUES[6]);
+    put.add(FAMILIES[6], QUALIFIERS[7], VALUES[7]);
+    put.add(FAMILIES[7], QUALIFIERS[7], VALUES[7]);
+    put.add(FAMILIES[9], QUALIFIERS[0], VALUES[0]);
+    ht.put(put);
+    
+    // Get multiple columns across multiple families and get empties around it
+    singleRowGetTest(ht, ROWS, FAMILIES, QUALIFIERS, VALUES);
+ 
+    // Scan multiple columns across multiple families and scan empties around it
+    singleRowScanTest(ht, ROWS, FAMILIES, QUALIFIERS, VALUES);
+
+    ////////////////////////////////////////////////////////////////////////////
+    // Flush the table again
+    ////////////////////////////////////////////////////////////////////////////
+    
+    TEST_UTIL.flush();
+    
+    // Redo tests again
+    singleRowGetTest(ht, ROWS, FAMILIES, QUALIFIERS, VALUES);
+    singleRowScanTest(ht, ROWS, FAMILIES, QUALIFIERS, VALUES);
+    
+    // Insert more data to memstore
+    put = new Put(ROWS[0]);
+    put.add(FAMILIES[6], QUALIFIERS[5], VALUES[5]);
+    put.add(FAMILIES[6], QUALIFIERS[8], VALUES[8]);
+    put.add(FAMILIES[6], QUALIFIERS[9], VALUES[9]);
+    put.add(FAMILIES[4], QUALIFIERS[3], VALUES[3]);
+    ht.put(put);
+    
+    ////////////////////////////////////////////////////////////////////////////
+    // Delete a storefile column
+    ////////////////////////////////////////////////////////////////////////////
+    delete = new Delete(ROWS[0]);
+    delete.deleteColumns(FAMILIES[6], QUALIFIERS[7]);
+    ht.delete(delete);
+    
+    // Try to get deleted column
+    get = new Get(ROWS[0]);
+    get.addColumn(FAMILIES[6], QUALIFIERS[7]);
+    result = ht.get(get);
+    assertEmptyResult(result);
+    
+    // Try to scan deleted column
+    scan = new Scan();
+    scan.addColumn(FAMILIES[6], QUALIFIERS[7]);
+    result = getSingleScanResult(ht, scan);
+    assertNullResult(result);
+    
+    // Make sure we can still get a column before it and after it
+    get = new Get(ROWS[0]);
+    get.addColumn(FAMILIES[6], QUALIFIERS[6]);
+    result = ht.get(get);
+    assertSingleResult(result, ROWS[0], FAMILIES[6], QUALIFIERS[6], VALUES[6]);
+    
+    get = new Get(ROWS[0]);
+    get.addColumn(FAMILIES[6], QUALIFIERS[8]);
+    result = ht.get(get);
+    assertSingleResult(result, ROWS[0], FAMILIES[6], QUALIFIERS[8], VALUES[8]);
+    
+    // Make sure we can still scan a column before it and after it
+    scan = new Scan();
+    scan.addColumn(FAMILIES[6], QUALIFIERS[6]);
+    result = getSingleScanResult(ht, scan);
+    assertSingleResult(result, ROWS[0], FAMILIES[6], QUALIFIERS[6], VALUES[6]);
+    
+    scan = new Scan();
+    scan.addColumn(FAMILIES[6], QUALIFIERS[8]);
+    result = getSingleScanResult(ht, scan);
+    assertSingleResult(result, ROWS[0], FAMILIES[6], QUALIFIERS[8], VALUES[8]);
+    
+    ////////////////////////////////////////////////////////////////////////////
+    // Delete a memstore column
+    ////////////////////////////////////////////////////////////////////////////
+    delete = new Delete(ROWS[0]);
+    delete.deleteColumns(FAMILIES[6], QUALIFIERS[8]);
+    ht.delete(delete);
+    
+    // Try to get deleted column
+    get = new Get(ROWS[0]);
+    get.addColumn(FAMILIES[6], QUALIFIERS[8]);
+    result = ht.get(get);
+    assertEmptyResult(result);
+    
+    // Try to scan deleted column
+    scan = new Scan();
+    scan.addColumn(FAMILIES[6], QUALIFIERS[8]);
+    result = getSingleScanResult(ht, scan);
+    assertNullResult(result);
+    
+    // Make sure we can still get a column before it and after it
+    get = new Get(ROWS[0]);
+    get.addColumn(FAMILIES[6], QUALIFIERS[6]);
+    result = ht.get(get);
+    assertSingleResult(result, ROWS[0], FAMILIES[6], QUALIFIERS[6], VALUES[6]);
+    
+    get = new Get(ROWS[0]);
+    get.addColumn(FAMILIES[6], QUALIFIERS[9]);
+    result = ht.get(get);
+    assertSingleResult(result, ROWS[0], FAMILIES[6], QUALIFIERS[9], VALUES[9]);
+    
+    // Make sure we can still scan a column before it and after it
+    scan = new Scan();
+    scan.addColumn(FAMILIES[6], QUALIFIERS[6]);
+    result = getSingleScanResult(ht, scan);
+    assertSingleResult(result, ROWS[0], FAMILIES[6], QUALIFIERS[6], VALUES[6]);
+    
+    scan = new Scan();
+    scan.addColumn(FAMILIES[6], QUALIFIERS[9]);
+    result = getSingleScanResult(ht, scan);
+    assertSingleResult(result, ROWS[0], FAMILIES[6], QUALIFIERS[9], VALUES[9]);
+    
+    ////////////////////////////////////////////////////////////////////////////
+    // Delete joint storefile/memstore family
+    ////////////////////////////////////////////////////////////////////////////
+    
+    delete = new Delete(ROWS[0]);
+    delete.deleteFamily(FAMILIES[4]);
+    ht.delete(delete);
+    
+    // Try to get storefile column in deleted family
+    get = new Get(ROWS[0]);
+    get.addColumn(FAMILIES[4], QUALIFIERS[4]);
+    result = ht.get(get);
+    assertEmptyResult(result);
+    
+    // Try to get memstore column in deleted family
+    get = new Get(ROWS[0]);
+    get.addColumn(FAMILIES[4], QUALIFIERS[3]);
+    result = ht.get(get);
+    assertEmptyResult(result);
+    
+    // Try to get deleted family
+    get = new Get(ROWS[0]);
+    get.addFamily(FAMILIES[4]);
+    result = ht.get(get);
+    assertEmptyResult(result);
+    
+    // Try to scan storefile column in deleted family
+    scan = new Scan();
+    scan.addColumn(FAMILIES[4], QUALIFIERS[4]);
+    result = getSingleScanResult(ht, scan);
+    assertNullResult(result);
+    
+    // Try to scan memstore column in deleted family
+    scan = new Scan();
+    scan.addColumn(FAMILIES[4], QUALIFIERS[3]);
+    result = getSingleScanResult(ht, scan);
+    assertNullResult(result);
+    
+    // Try to scan deleted family
+    scan = new Scan();
+    scan.addFamily(FAMILIES[4]);
+    result = getSingleScanResult(ht, scan);
+    assertNullResult(result);
+    
+    // Make sure we can still get another family
+    get = new Get(ROWS[0]);
+    get.addColumn(FAMILIES[2], QUALIFIERS[2]);
+    result = ht.get(get);
+    assertSingleResult(result, ROWS[0], FAMILIES[2], QUALIFIERS[2], VALUES[2]);
+    
+    get = new Get(ROWS[0]);
+    get.addColumn(FAMILIES[6], QUALIFIERS[9]);
+    result = ht.get(get);
+    assertSingleResult(result, ROWS[0], FAMILIES[6], QUALIFIERS[9], VALUES[9]);
+    
+    // Make sure we can still scan another family
+    scan = new Scan();
+    scan.addColumn(FAMILIES[6], QUALIFIERS[6]);
+    result = getSingleScanResult(ht, scan);
+    assertSingleResult(result, ROWS[0], FAMILIES[6], QUALIFIERS[6], VALUES[6]);
+    
+    scan = new Scan();
+    scan.addColumn(FAMILIES[6], QUALIFIERS[9]);
+    result = getSingleScanResult(ht, scan);
+    assertSingleResult(result, ROWS[0], FAMILIES[6], QUALIFIERS[9], VALUES[9]);
+    
+    ////////////////////////////////////////////////////////////////////////////
+    // Flush everything and rerun delete tests
+    ////////////////////////////////////////////////////////////////////////////
+    
+    TEST_UTIL.flush();
+    
+    // Try to get storefile column in deleted family
+    get = new Get(ROWS[0]);
+    get.addColumn(FAMILIES[4], QUALIFIERS[4]);
+    result = ht.get(get);
+    assertEmptyResult(result);
+    
+    // Try to get memstore column in deleted family
+    get = new Get(ROWS[0]);
+    get.addColumn(FAMILIES[4], QUALIFIERS[3]);
+    result = ht.get(get);
+    assertEmptyResult(result);
+    
+    // Try to get deleted family
+    get = new Get(ROWS[0]);
+    get.addFamily(FAMILIES[4]);
+    result = ht.get(get);
+    assertEmptyResult(result);
+    
+    // Try to scan storefile column in deleted family
+    scan = new Scan();
+    scan.addColumn(FAMILIES[4], QUALIFIERS[4]);
+    result = getSingleScanResult(ht, scan);
+    assertNullResult(result);
+    
+    // Try to scan memstore column in deleted family
+    scan = new Scan();
+    scan.addColumn(FAMILIES[4], QUALIFIERS[3]);
+    result = getSingleScanResult(ht, scan);
+    assertNullResult(result);
+    
+    // Try to scan deleted family
+    scan = new Scan();
+    scan.addFamily(FAMILIES[4]);
+    result = getSingleScanResult(ht, scan);
+    assertNullResult(result);
+    
+    // Make sure we can still get another family
+    get = new Get(ROWS[0]);
+    get.addColumn(FAMILIES[2], QUALIFIERS[2]);
+    result = ht.get(get);
+    assertSingleResult(result, ROWS[0], FAMILIES[2], QUALIFIERS[2], VALUES[2]);
+    
+    get = new Get(ROWS[0]);
+    get.addColumn(FAMILIES[6], QUALIFIERS[9]);
+    result = ht.get(get);
+    assertSingleResult(result, ROWS[0], FAMILIES[6], QUALIFIERS[9], VALUES[9]);
+    
+    // Make sure we can still scan another family
+    scan = new Scan();
+    scan.addColumn(FAMILIES[6], QUALIFIERS[6]);
+    result = getSingleScanResult(ht, scan);
+    assertSingleResult(result, ROWS[0], FAMILIES[6], QUALIFIERS[6], VALUES[6]);
+    
+    scan = new Scan();
+    scan.addColumn(FAMILIES[6], QUALIFIERS[9]);
+    result = getSingleScanResult(ht, scan);
+    assertSingleResult(result, ROWS[0], FAMILIES[6], QUALIFIERS[9], VALUES[9]);
+    
+  }
+
+  @Test
+  public void testNull() throws Exception {
+    byte [] TABLE = Bytes.toBytes("testNull");
+    
+    // Null table name (should NOT work)
+    try {
+      TEST_UTIL.createTable(null, FAMILY);
+      throw new IOException("Creating a table with null name passed, should have failed");
+    } catch(Exception e) {}
+
+    // Null family (should NOT work)
+    try {
+      TEST_UTIL.createTable(TABLE, (byte[])null);
+      throw new IOException("Creating a table with a null family passed, should fail");
+    } catch(Exception e) {}
+    
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILY);
+    
+    // Null row (should NOT work)
+    try {
+      Put put = new Put((byte[])null);
+      put.add(FAMILY, QUALIFIER, VALUE);
+      ht.put(put);
+      throw new IOException("Inserting a null row worked, should throw exception");
+    } catch(Exception e) {}
+    
+    // Null qualifier (should work)
+    {
+      Put put = new Put(ROW);
+      put.add(FAMILY, null, VALUE);
+      ht.put(put);
+
+      getTestNull(ht, ROW, FAMILY, VALUE);
+
+      scanTestNull(ht, ROW, FAMILY, VALUE);
+
+      Delete delete = new Delete(ROW);
+      delete.deleteColumns(FAMILY, null);
+      ht.delete(delete);
+
+      Get get = new Get(ROW);
+      Result result = ht.get(get);
+      assertEmptyResult(result);
+    }
+
+    // Use a new table
+    byte [] TABLE2 = Bytes.toBytes("testNull2");
+    ht = TEST_UTIL.createTable(TABLE2, FAMILY);
+
+    // Empty qualifier, byte[0] instead of null (should work)
+    try {
+      Put put = new Put(ROW);
+      put.add(FAMILY, HConstants.EMPTY_BYTE_ARRAY, VALUE);
+      ht.put(put);
+      
+      getTestNull(ht, ROW, FAMILY, VALUE);
+      
+      scanTestNull(ht, ROW, FAMILY, VALUE);
+      
+      // Flush and try again
+      
+      TEST_UTIL.flush();
+      
+      getTestNull(ht, ROW, FAMILY, VALUE);
+      
+      scanTestNull(ht, ROW, FAMILY, VALUE);
+      
+      Delete delete = new Delete(ROW);
+      delete.deleteColumns(FAMILY, HConstants.EMPTY_BYTE_ARRAY);
+      ht.delete(delete);
+      
+      Get get = new Get(ROW);
+      Result result = ht.get(get);
+      assertEmptyResult(result);
+      
+    } catch(Exception e) {
+      throw new IOException("Using a row with null qualifier threw exception, should ");
+    }
+    
+    // Null value
+    try {
+      Put put = new Put(ROW);
+      put.add(FAMILY, QUALIFIER, null);
+      ht.put(put);
+      
+      Get get = new Get(ROW);
+      get.addColumn(FAMILY, QUALIFIER);
+      Result result = ht.get(get);
+      assertSingleResult(result, ROW, FAMILY, QUALIFIER, null);
+      
+      Scan scan = new Scan();
+      scan.addColumn(FAMILY, QUALIFIER);
+      result = getSingleScanResult(ht, scan);
+      assertSingleResult(result, ROW, FAMILY, QUALIFIER, null);
+      
+      Delete delete = new Delete(ROW);
+      delete.deleteColumns(FAMILY, QUALIFIER);
+      ht.delete(delete);
+      
+      get = new Get(ROW);
+      result = ht.get(get);
+      assertEmptyResult(result);
+    
+    } catch(Exception e) {
+      throw new IOException("Null values should be allowed, but threw exception");
+    }
+  }
+
+  @Test
+  public void testVersions() throws Exception {
+    byte [] TABLE = Bytes.toBytes("testVersions");
+    
+    long [] STAMPS = makeStamps(20);
+    byte [][] VALUES = makeNAscii(VALUE, 20);
+    
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILY, 10);
+    
+    // Insert 4 versions of same column
+    Put put = new Put(ROW);
+    put.add(FAMILY, QUALIFIER, STAMPS[1], VALUES[1]);
+    put.add(FAMILY, QUALIFIER, STAMPS[2], VALUES[2]);
+    put.add(FAMILY, QUALIFIER, STAMPS[4], VALUES[4]);
+    put.add(FAMILY, QUALIFIER, STAMPS[5], VALUES[5]);
+    ht.put(put);
+    
+    // Verify we can get each one properly
+    getVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[1], VALUES[1]);
+    getVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[2], VALUES[2]);
+    getVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[4], VALUES[4]);
+    getVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[5], VALUES[5]);
+    scanVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[1], VALUES[1]);
+    scanVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[2], VALUES[2]);
+    scanVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[4], VALUES[4]);
+    scanVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[5], VALUES[5]);
+    
+    // Verify we don't accidentally get others
+    getVersionAndVerifyMissing(ht, ROW, FAMILY, QUALIFIER, STAMPS[0]);
+    getVersionAndVerifyMissing(ht, ROW, FAMILY, QUALIFIER, STAMPS[3]);
+    getVersionAndVerifyMissing(ht, ROW, FAMILY, QUALIFIER, STAMPS[6]);
+    scanVersionAndVerifyMissing(ht, ROW, FAMILY, QUALIFIER, STAMPS[0]);
+    scanVersionAndVerifyMissing(ht, ROW, FAMILY, QUALIFIER, STAMPS[3]);
+    scanVersionAndVerifyMissing(ht, ROW, FAMILY, QUALIFIER, STAMPS[6]);
+    
+    // Ensure maxVersions in query is respected
+    Get get = new Get(ROW);
+    get.addColumn(FAMILY, QUALIFIER);
+    get.setMaxVersions(2);
+    Result result = ht.get(get);
+    assertNResult(result, ROW, FAMILY, QUALIFIER, 
+        new long [] {STAMPS[4], STAMPS[5]},
+        new byte[][] {VALUES[4], VALUES[5]},
+        0, 1);
+    
+    Scan scan = new Scan(ROW);
+    scan.addColumn(FAMILY, QUALIFIER);
+    scan.setMaxVersions(2);
+    result = getSingleScanResult(ht, scan);
+    assertNResult(result, ROW, FAMILY, QUALIFIER, 
+        new long [] {STAMPS[4], STAMPS[5]},
+        new byte[][] {VALUES[4], VALUES[5]},
+        0, 1);
+    
+    // Flush and redo
+
+    TEST_UTIL.flush();
+    
+    // Verify we can get each one properly
+    getVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[1], VALUES[1]);
+    getVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[2], VALUES[2]);
+    getVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[4], VALUES[4]);
+    getVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[5], VALUES[5]);
+    scanVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[1], VALUES[1]);
+    scanVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[2], VALUES[2]);
+    scanVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[4], VALUES[4]);
+    scanVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[5], VALUES[5]);
+    
+    // Verify we don't accidentally get others
+    getVersionAndVerifyMissing(ht, ROW, FAMILY, QUALIFIER, STAMPS[0]);
+    getVersionAndVerifyMissing(ht, ROW, FAMILY, QUALIFIER, STAMPS[3]);
+    getVersionAndVerifyMissing(ht, ROW, FAMILY, QUALIFIER, STAMPS[6]);
+    scanVersionAndVerifyMissing(ht, ROW, FAMILY, QUALIFIER, STAMPS[0]);
+    scanVersionAndVerifyMissing(ht, ROW, FAMILY, QUALIFIER, STAMPS[3]);
+    scanVersionAndVerifyMissing(ht, ROW, FAMILY, QUALIFIER, STAMPS[6]);
+    
+    // Ensure maxVersions in query is respected
+    get = new Get(ROW);
+    get.addColumn(FAMILY, QUALIFIER);
+    get.setMaxVersions(2);
+    result = ht.get(get);
+    assertNResult(result, ROW, FAMILY, QUALIFIER, 
+        new long [] {STAMPS[4], STAMPS[5]},
+        new byte[][] {VALUES[4], VALUES[5]},
+        0, 1);
+    
+    scan = new Scan(ROW);
+    scan.addColumn(FAMILY, QUALIFIER);
+    scan.setMaxVersions(2);
+    result = getSingleScanResult(ht, scan);
+    assertNResult(result, ROW, FAMILY, QUALIFIER, 
+        new long [] {STAMPS[4], STAMPS[5]},
+        new byte[][] {VALUES[4], VALUES[5]},
+        0, 1);
+    
+    
+    // Add some memstore and retest
+
+    // Insert 4 more versions of same column and a dupe
+    put = new Put(ROW);
+    put.add(FAMILY, QUALIFIER, STAMPS[3], VALUES[3]);
+    put.add(FAMILY, QUALIFIER, STAMPS[6], VALUES[6]);
+    put.add(FAMILY, QUALIFIER, STAMPS[7], VALUES[7]);
+    put.add(FAMILY, QUALIFIER, STAMPS[8], VALUES[8]);
+    ht.put(put);
+    
+    // Ensure maxVersions in query is respected
+    get = new Get(ROW);
+    get.addColumn(FAMILY, QUALIFIER);
+    get.setMaxVersions();
+    result = ht.get(get);
+    assertNResult(result, ROW, FAMILY, QUALIFIER, 
+        new long [] {STAMPS[1], STAMPS[2], STAMPS[3], STAMPS[4], STAMPS[5], STAMPS[6], STAMPS[7], STAMPS[8]},
+        new byte[][] {VALUES[1], VALUES[2], VALUES[3], VALUES[4], VALUES[5], VALUES[6], VALUES[7], VALUES[8]},
+        0, 7);
+    
+    scan = new Scan(ROW);
+    scan.addColumn(FAMILY, QUALIFIER);
+    scan.setMaxVersions();
+    result = getSingleScanResult(ht, scan);
+    assertNResult(result, ROW, FAMILY, QUALIFIER, 
+        new long [] {STAMPS[1], STAMPS[2], STAMPS[3], STAMPS[4], STAMPS[5], STAMPS[6], STAMPS[7], STAMPS[8]},
+        new byte[][] {VALUES[1], VALUES[2], VALUES[3], VALUES[4], VALUES[5], VALUES[6], VALUES[7], VALUES[8]},
+        0, 7);
+    
+    get = new Get(ROW);
+    get.setMaxVersions();
+    result = ht.get(get);
+    assertNResult(result, ROW, FAMILY, QUALIFIER, 
+        new long [] {STAMPS[1], STAMPS[2], STAMPS[3], STAMPS[4], STAMPS[5], STAMPS[6], STAMPS[7], STAMPS[8]},
+        new byte[][] {VALUES[1], VALUES[2], VALUES[3], VALUES[4], VALUES[5], VALUES[6], VALUES[7], VALUES[8]},
+        0, 7);
+    
+    scan = new Scan(ROW);
+    scan.setMaxVersions();
+    result = getSingleScanResult(ht, scan);
+    assertNResult(result, ROW, FAMILY, QUALIFIER, 
+        new long [] {STAMPS[1], STAMPS[2], STAMPS[3], STAMPS[4], STAMPS[5], STAMPS[6], STAMPS[7], STAMPS[8]},
+        new byte[][] {VALUES[1], VALUES[2], VALUES[3], VALUES[4], VALUES[5], VALUES[6], VALUES[7], VALUES[8]},
+        0, 7);
+    
+    // Verify we can get each one properly
+    getVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[1], VALUES[1]);
+    getVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[2], VALUES[2]);
+    getVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[4], VALUES[4]);
+    getVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[7], VALUES[7]);
+    scanVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[1], VALUES[1]);
+    scanVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[2], VALUES[2]);
+    scanVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[4], VALUES[4]);
+    scanVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[7], VALUES[7]);
+    
+    // Verify we don't accidentally get others
+    getVersionAndVerifyMissing(ht, ROW, FAMILY, QUALIFIER, STAMPS[0]);
+    getVersionAndVerifyMissing(ht, ROW, FAMILY, QUALIFIER, STAMPS[9]);
+    scanVersionAndVerifyMissing(ht, ROW, FAMILY, QUALIFIER, STAMPS[0]);
+    scanVersionAndVerifyMissing(ht, ROW, FAMILY, QUALIFIER, STAMPS[9]);
+    
+    // Ensure maxVersions of table is respected
+
+    TEST_UTIL.flush();
+
+    // Insert 4 more versions of same column and a dupe
+    put = new Put(ROW);
+    put.add(FAMILY, QUALIFIER, STAMPS[9], VALUES[9]);
+    put.add(FAMILY, QUALIFIER, STAMPS[11], VALUES[11]);
+    put.add(FAMILY, QUALIFIER, STAMPS[13], VALUES[13]);
+    put.add(FAMILY, QUALIFIER, STAMPS[15], VALUES[15]);
+    ht.put(put);
+    
+    get = new Get(ROW);
+    get.addColumn(FAMILY, QUALIFIER);
+    get.setMaxVersions(Integer.MAX_VALUE);
+    result = ht.get(get);
+    assertNResult(result, ROW, FAMILY, QUALIFIER, 
+        new long [] {STAMPS[3], STAMPS[4], STAMPS[5], STAMPS[6], STAMPS[7], STAMPS[8], STAMPS[9], STAMPS[11], STAMPS[13], STAMPS[15]},
+        new byte[][] {VALUES[3], VALUES[4], VALUES[5], VALUES[6], VALUES[7], VALUES[8], VALUES[9], VALUES[11], VALUES[13], VALUES[15]},
+        0, 9);
+    
+    scan = new Scan(ROW);
+    scan.addColumn(FAMILY, QUALIFIER);
+    scan.setMaxVersions(Integer.MAX_VALUE);
+    result = getSingleScanResult(ht, scan);
+    assertNResult(result, ROW, FAMILY, QUALIFIER, 
+        new long [] {STAMPS[3], STAMPS[4], STAMPS[5], STAMPS[6], STAMPS[7], STAMPS[8], STAMPS[9], STAMPS[11], STAMPS[13], STAMPS[15]},
+        new byte[][] {VALUES[3], VALUES[4], VALUES[5], VALUES[6], VALUES[7], VALUES[8], VALUES[9], VALUES[11], VALUES[13], VALUES[15]},
+        0, 9);
+    
+    // Delete a version in the memstore and a version in a storefile
+    Delete delete = new Delete(ROW);
+    delete.deleteColumn(FAMILY, QUALIFIER, STAMPS[11]);
+    delete.deleteColumn(FAMILY, QUALIFIER, STAMPS[7]);
+    ht.delete(delete);
+    
+    // Test that it's gone
+    get = new Get(ROW);
+    get.addColumn(FAMILY, QUALIFIER);
+    get.setMaxVersions(Integer.MAX_VALUE);
+    result = ht.get(get);
+    assertNResult(result, ROW, FAMILY, QUALIFIER, 
+        new long [] {STAMPS[1], STAMPS[2], STAMPS[3], STAMPS[4], STAMPS[5], STAMPS[6], STAMPS[8], STAMPS[9], STAMPS[13], STAMPS[15]},
+        new byte[][] {VALUES[1], VALUES[2], VALUES[3], VALUES[4], VALUES[5], VALUES[6], VALUES[8], VALUES[9], VALUES[13], VALUES[15]},
+        0, 9);
+    
+    scan = new Scan(ROW);
+    scan.addColumn(FAMILY, QUALIFIER);
+    scan.setMaxVersions(Integer.MAX_VALUE);
+    result = getSingleScanResult(ht, scan);
+    assertNResult(result, ROW, FAMILY, QUALIFIER, 
+        new long [] {STAMPS[1], STAMPS[2], STAMPS[3], STAMPS[4], STAMPS[5], STAMPS[6], STAMPS[8], STAMPS[9], STAMPS[13], STAMPS[15]},
+        new byte[][] {VALUES[1], VALUES[2], VALUES[3], VALUES[4], VALUES[5], VALUES[6], VALUES[8], VALUES[9], VALUES[13], VALUES[15]},
+        0, 9);
+    
+  }
+
+  @Test
+  public void testVersionLimits() throws Exception {
+    byte [] TABLE = Bytes.toBytes("testVersionLimits");
+    byte [][] FAMILIES = makeNAscii(FAMILY, 3);
+    int [] LIMITS = {1,3,5};
+    long [] STAMPS = makeStamps(10);
+    byte [][] VALUES = makeNAscii(VALUE, 10);
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, LIMITS); 
+    
+    // Insert limit + 1 on each family
+    Put put = new Put(ROW);
+    put.add(FAMILIES[0], QUALIFIER, STAMPS[0], VALUES[0]);
+    put.add(FAMILIES[0], QUALIFIER, STAMPS[1], VALUES[1]);
+    put.add(FAMILIES[1], QUALIFIER, STAMPS[0], VALUES[0]);
+    put.add(FAMILIES[1], QUALIFIER, STAMPS[1], VALUES[1]);
+    put.add(FAMILIES[1], QUALIFIER, STAMPS[2], VALUES[2]);
+    put.add(FAMILIES[1], QUALIFIER, STAMPS[3], VALUES[3]);
+    put.add(FAMILIES[2], QUALIFIER, STAMPS[0], VALUES[0]);
+    put.add(FAMILIES[2], QUALIFIER, STAMPS[1], VALUES[1]);
+    put.add(FAMILIES[2], QUALIFIER, STAMPS[2], VALUES[2]);
+    put.add(FAMILIES[2], QUALIFIER, STAMPS[3], VALUES[3]);
+    put.add(FAMILIES[2], QUALIFIER, STAMPS[4], VALUES[4]);
+    put.add(FAMILIES[2], QUALIFIER, STAMPS[5], VALUES[5]);
+    put.add(FAMILIES[2], QUALIFIER, STAMPS[6], VALUES[6]);
+    ht.put(put);
+    
+    // Verify we only get the right number out of each
+
+    // Family0
+    
+    Get get = new Get(ROW);
+    get.addColumn(FAMILIES[0], QUALIFIER);
+    get.setMaxVersions(Integer.MAX_VALUE);
+    Result result = ht.get(get);
+    assertNResult(result, ROW, FAMILIES[0], QUALIFIER, 
+        new long [] {STAMPS[1]},
+        new byte[][] {VALUES[1]},
+        0, 0);
+    
+    get = new Get(ROW);
+    get.addFamily(FAMILIES[0]);
+    get.setMaxVersions(Integer.MAX_VALUE);
+    result = ht.get(get);
+    assertNResult(result, ROW, FAMILIES[0], QUALIFIER, 
+        new long [] {STAMPS[1]},
+        new byte[][] {VALUES[1]},
+        0, 0);
+    
+    Scan scan = new Scan(ROW);
+    scan.addColumn(FAMILIES[0], QUALIFIER);
+    scan.setMaxVersions(Integer.MAX_VALUE);
+    result = getSingleScanResult(ht, scan);
+    assertNResult(result, ROW, FAMILIES[0], QUALIFIER, 
+        new long [] {STAMPS[1]},
+        new byte[][] {VALUES[1]},
+        0, 0);
+    
+    scan = new Scan(ROW);
+    scan.addFamily(FAMILIES[0]);
+    scan.setMaxVersions(Integer.MAX_VALUE);
+    result = getSingleScanResult(ht, scan);
+    assertNResult(result, ROW, FAMILIES[0], QUALIFIER, 
+        new long [] {STAMPS[1]},
+        new byte[][] {VALUES[1]},
+        0, 0);
+    
+    // Family1
+    
+    get = new Get(ROW);
+    get.addColumn(FAMILIES[1], QUALIFIER);
+    get.setMaxVersions(Integer.MAX_VALUE);
+    result = ht.get(get);
+    assertNResult(result, ROW, FAMILIES[1], QUALIFIER, 
+        new long [] {STAMPS[1], STAMPS[2], STAMPS[3]},
+        new byte[][] {VALUES[1], VALUES[2], VALUES[3]},
+        0, 2);
+    
+    get = new Get(ROW);
+    get.addFamily(FAMILIES[1]);
+    get.setMaxVersions(Integer.MAX_VALUE);
+    result = ht.get(get);
+    assertNResult(result, ROW, FAMILIES[1], QUALIFIER, 
+        new long [] {STAMPS[1], STAMPS[2], STAMPS[3]},
+        new byte[][] {VALUES[1], VALUES[2], VALUES[3]},
+        0, 2);
+    
+    scan = new Scan(ROW);
+    scan.addColumn(FAMILIES[1], QUALIFIER);
+    scan.setMaxVersions(Integer.MAX_VALUE);
+    result = getSingleScanResult(ht, scan);
+    assertNResult(result, ROW, FAMILIES[1], QUALIFIER, 
+        new long [] {STAMPS[1], STAMPS[2], STAMPS[3]},
+        new byte[][] {VALUES[1], VALUES[2], VALUES[3]},
+        0, 2);
+    
+    scan = new Scan(ROW);
+    scan.addFamily(FAMILIES[1]);
+    scan.setMaxVersions(Integer.MAX_VALUE);
+    result = getSingleScanResult(ht, scan);
+    assertNResult(result, ROW, FAMILIES[1], QUALIFIER, 
+        new long [] {STAMPS[1], STAMPS[2], STAMPS[3]},
+        new byte[][] {VALUES[1], VALUES[2], VALUES[3]},
+        0, 2);
+    
+    // Family2
+    
+    get = new Get(ROW);
+    get.addColumn(FAMILIES[2], QUALIFIER);
+    get.setMaxVersions(Integer.MAX_VALUE);
+    result = ht.get(get);
+    assertNResult(result, ROW, FAMILIES[2], QUALIFIER, 
+        new long [] {STAMPS[2], STAMPS[3], STAMPS[4], STAMPS[5], STAMPS[6]},
+        new byte[][] {VALUES[2], VALUES[3], VALUES[4], VALUES[5], VALUES[6]},
+        0, 4);
+    
+    get = new Get(ROW);
+    get.addFamily(FAMILIES[2]);
+    get.setMaxVersions(Integer.MAX_VALUE);
+    result = ht.get(get);
+    assertNResult(result, ROW, FAMILIES[2], QUALIFIER, 
+        new long [] {STAMPS[2], STAMPS[3], STAMPS[4], STAMPS[5], STAMPS[6]},
+        new byte[][] {VALUES[2], VALUES[3], VALUES[4], VALUES[5], VALUES[6]},
+        0, 4);
+    
+    scan = new Scan(ROW);
+    scan.addColumn(FAMILIES[2], QUALIFIER);
+    scan.setMaxVersions(Integer.MAX_VALUE);
+    result = getSingleScanResult(ht, scan);
+    assertNResult(result, ROW, FAMILIES[2], QUALIFIER, 
+        new long [] {STAMPS[2], STAMPS[3], STAMPS[4], STAMPS[5], STAMPS[6]},
+        new byte[][] {VALUES[2], VALUES[3], VALUES[4], VALUES[5], VALUES[6]},
+        0, 4);
+    
+    scan = new Scan(ROW);
+    scan.addFamily(FAMILIES[2]);
+    scan.setMaxVersions(Integer.MAX_VALUE);
+    result = getSingleScanResult(ht, scan);
+    assertNResult(result, ROW, FAMILIES[2], QUALIFIER, 
+        new long [] {STAMPS[2], STAMPS[3], STAMPS[4], STAMPS[5], STAMPS[6]},
+        new byte[][] {VALUES[2], VALUES[3], VALUES[4], VALUES[5], VALUES[6]},
+        0, 4);
+    
+    // Try all families
+
+    get = new Get(ROW);
+    get.setMaxVersions(Integer.MAX_VALUE);
+    result = ht.get(get);
+    assertTrue("Expected 9 keys but received " + result.size(),
+        result.size() == 9);
+    
+    get = new Get(ROW);
+    get.addFamily(FAMILIES[0]);
+    get.addFamily(FAMILIES[1]);
+    get.addFamily(FAMILIES[2]);
+    get.setMaxVersions(Integer.MAX_VALUE);
+    result = ht.get(get);
+    assertTrue("Expected 9 keys but received " + result.size(),
+        result.size() == 9);
+    
+    get = new Get(ROW);
+    get.addColumn(FAMILIES[0], QUALIFIER);
+    get.addColumn(FAMILIES[1], QUALIFIER);
+    get.addColumn(FAMILIES[2], QUALIFIER);
+    get.setMaxVersions(Integer.MAX_VALUE);
+    result = ht.get(get);
+    assertTrue("Expected 9 keys but received " + result.size(),
+        result.size() == 9);
+    
+    scan = new Scan(ROW);
+    scan.setMaxVersions(Integer.MAX_VALUE);
+    result = getSingleScanResult(ht, scan);
+    assertTrue("Expected 9 keys but received " + result.size(),
+        result.size() == 9);
+    
+    scan = new Scan(ROW);
+    scan.setMaxVersions(Integer.MAX_VALUE);
+    scan.addFamily(FAMILIES[0]);
+    scan.addFamily(FAMILIES[1]);
+    scan.addFamily(FAMILIES[2]);
+    result = getSingleScanResult(ht, scan);
+    assertTrue("Expected 9 keys but received " + result.size(),
+        result.size() == 9);
+    
+    scan = new Scan(ROW);
+    scan.setMaxVersions(Integer.MAX_VALUE);
+    scan.addColumn(FAMILIES[0], QUALIFIER);
+    scan.addColumn(FAMILIES[1], QUALIFIER);
+    scan.addColumn(FAMILIES[2], QUALIFIER);
+    result = getSingleScanResult(ht, scan);
+    assertTrue("Expected 9 keys but received " + result.size(),
+        result.size() == 9);
+    
+  }
+
+  @Test
+  public void testDeletes() throws Exception {
+    byte [] TABLE = Bytes.toBytes("testDeletes");
+    
+    byte [][] ROWS = makeNAscii(ROW, 6);
+    byte [][] FAMILIES = makeNAscii(FAMILY, 3);
+    byte [][] VALUES = makeN(VALUE, 5);
+    long [] ts = {1000, 2000, 3000, 4000, 5000};
+    
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES);
+    
+    Put put = new Put(ROW);
+    put.add(FAMILIES[0], QUALIFIER, ts[0], VALUES[0]);
+    put.add(FAMILIES[0], QUALIFIER, ts[1], VALUES[1]);
+    ht.put(put);
+    
+    Delete delete = new Delete(ROW);
+    delete.deleteFamily(FAMILIES[0], ts[0]);
+    ht.delete(delete);
+    
+    Get get = new Get(ROW);
+    get.addFamily(FAMILIES[0]);
+    get.setMaxVersions(Integer.MAX_VALUE);
+    Result result = ht.get(get);
+    assertNResult(result, ROW, FAMILIES[0], QUALIFIER, 
+        new long [] {ts[1]},
+        new byte[][] {VALUES[1]},
+        0, 0);
+    
+    Scan scan = new Scan(ROW);
+    scan.addFamily(FAMILIES[0]);
+    scan.setMaxVersions(Integer.MAX_VALUE);
+    result = getSingleScanResult(ht, scan);
+    assertNResult(result, ROW, FAMILIES[0], QUALIFIER, 
+        new long [] {ts[1]},
+        new byte[][] {VALUES[1]},
+        0, 0);
+    
+    // Test delete latest version
+    put = new Put(ROW);
+    put.add(FAMILIES[0], QUALIFIER, ts[4], VALUES[4]);
+    put.add(FAMILIES[0], QUALIFIER, ts[2], VALUES[2]);
+    put.add(FAMILIES[0], QUALIFIER, ts[3], VALUES[3]);
+    put.add(FAMILIES[0], null, ts[4], VALUES[4]);
+    put.add(FAMILIES[0], null, ts[2], VALUES[2]);
+    put.add(FAMILIES[0], null, ts[3], VALUES[3]);
+    ht.put(put);
+    
+    delete = new Delete(ROW);
+    delete.deleteColumn(FAMILIES[0], QUALIFIER);
+    ht.delete(delete);
+    
+    get = new Get(ROW);
+    get.addColumn(FAMILIES[0], QUALIFIER);
+    get.setMaxVersions(Integer.MAX_VALUE);
+    result = ht.get(get);
+    assertNResult(result, ROW, FAMILIES[0], QUALIFIER, 
+        new long [] {ts[1], ts[2], ts[3]},
+        new byte[][] {VALUES[1], VALUES[2], VALUES[3]},
+        0, 2);
+    
+    scan = new Scan(ROW);
+    scan.addColumn(FAMILIES[0], QUALIFIER);
+    scan.setMaxVersions(Integer.MAX_VALUE);
+    result = getSingleScanResult(ht, scan);
+    assertNResult(result, ROW, FAMILIES[0], QUALIFIER, 
+        new long [] {ts[1], ts[2], ts[3]},
+        new byte[][] {VALUES[1], VALUES[2], VALUES[3]},
+        0, 2);
+    
+    // Test for HBASE-1847
+    delete = new Delete(ROW);
+    delete.deleteColumn(FAMILIES[0], null);
+    ht.delete(delete);
+    
+    // Cleanup null qualifier
+    delete = new Delete(ROW);
+    delete.deleteColumns(FAMILIES[0], null);
+    ht.delete(delete);
+    
+    // Expected client behavior might be that you can re-put deleted values
+    // But alas, this is not to be.  We can't put them back in either case.
+    
+    put = new Put(ROW);
+    put.add(FAMILIES[0], QUALIFIER, ts[0], VALUES[0]);
+    put.add(FAMILIES[0], QUALIFIER, ts[4], VALUES[4]);
+    ht.put(put);
+    
+    // The Get returns the latest value but then does not return the
+    // oldest, which was never deleted, ts[1]. 
+    
+    get = new Get(ROW);
+    get.addFamily(FAMILIES[0]);
+    get.setMaxVersions(Integer.MAX_VALUE);
+    result = ht.get(get);
+    assertNResult(result, ROW, FAMILIES[0], QUALIFIER, 
+        new long [] {ts[2], ts[3], ts[4]},
+        new byte[][] {VALUES[2], VALUES[3], VALUES[4]},
+        0, 2);
+    
+    // The Scanner returns the previous values, the expected-unexpected behavior
+    
+    scan = new Scan(ROW);
+    scan.addFamily(FAMILIES[0]);
+    scan.setMaxVersions(Integer.MAX_VALUE);
+    result = getSingleScanResult(ht, scan);
+    assertNResult(result, ROW, FAMILIES[0], QUALIFIER, 
+        new long [] {ts[1], ts[2], ts[3]},
+        new byte[][] {VALUES[1], VALUES[2], VALUES[3]},
+        0, 2);
+    
+    // Test deleting an entire family from one row but not the other various ways
+    
+    put = new Put(ROWS[0]);
+    put.add(FAMILIES[1], QUALIFIER, ts[0], VALUES[0]);
+    put.add(FAMILIES[1], QUALIFIER, ts[1], VALUES[1]);
+    put.add(FAMILIES[2], QUALIFIER, ts[2], VALUES[2]);
+    put.add(FAMILIES[2], QUALIFIER, ts[3], VALUES[3]);
+    ht.put(put);
+    
+    put = new Put(ROWS[1]);
+    put.add(FAMILIES[1], QUALIFIER, ts[0], VALUES[0]);
+    put.add(FAMILIES[1], QUALIFIER, ts[1], VALUES[1]);
+    put.add(FAMILIES[2], QUALIFIER, ts[2], VALUES[2]);
+    put.add(FAMILIES[2], QUALIFIER, ts[3], VALUES[3]);
+    ht.put(put);
+    
+    put = new Put(ROWS[2]);
+    put.add(FAMILIES[1], QUALIFIER, ts[0], VALUES[0]);
+    put.add(FAMILIES[1], QUALIFIER, ts[1], VALUES[1]);
+    put.add(FAMILIES[2], QUALIFIER, ts[2], VALUES[2]);
+    put.add(FAMILIES[2], QUALIFIER, ts[3], VALUES[3]);
+    ht.put(put);
+    
+    delete = new Delete(ROWS[0]);
+    delete.deleteFamily(FAMILIES[2]);
+    ht.delete(delete);
+    
+    delete = new Delete(ROWS[1]);
+    delete.deleteColumns(FAMILIES[1], QUALIFIER);
+    ht.delete(delete);
+    
+    delete = new Delete(ROWS[2]);
+    delete.deleteColumn(FAMILIES[1], QUALIFIER);
+    delete.deleteColumn(FAMILIES[1], QUALIFIER);
+    delete.deleteColumn(FAMILIES[2], QUALIFIER);
+    ht.delete(delete);
+    
+    get = new Get(ROWS[0]);
+    get.addFamily(FAMILIES[1]);
+    get.addFamily(FAMILIES[2]);
+    get.setMaxVersions(Integer.MAX_VALUE);
+    result = ht.get(get);
+    assertTrue("Expected 2 keys but received " + result.size(),
+        result.size() == 2);
+    assertNResult(result, ROWS[0], FAMILIES[1], QUALIFIER, 
+        new long [] {ts[0], ts[1]},
+        new byte[][] {VALUES[0], VALUES[1]},
+        0, 1);
+    
+    scan = new Scan(ROWS[0]);
+    scan.addFamily(FAMILIES[1]);
+    scan.addFamily(FAMILIES[2]);
+    scan.setMaxVersions(Integer.MAX_VALUE);
+    result = getSingleScanResult(ht, scan);
+    assertTrue("Expected 2 keys but received " + result.size(),
+        result.size() == 2);
+    assertNResult(result, ROWS[0], FAMILIES[1], QUALIFIER, 
+        new long [] {ts[0], ts[1]},
+        new byte[][] {VALUES[0], VALUES[1]},
+        0, 1);
+    
+    get = new Get(ROWS[1]);
+    get.addFamily(FAMILIES[1]);
+    get.addFamily(FAMILIES[2]);
+    get.setMaxVersions(Integer.MAX_VALUE);
+    result = ht.get(get);
+    assertTrue("Expected 2 keys but received " + result.size(),
+        result.size() == 2);
+
+    scan = new Scan(ROWS[1]);
+    scan.addFamily(FAMILIES[1]);
+    scan.addFamily(FAMILIES[2]);
+    scan.setMaxVersions(Integer.MAX_VALUE);
+    result = getSingleScanResult(ht, scan);
+    assertTrue("Expected 2 keys but received " + result.size(),
+        result.size() == 2);
+    
+    get = new Get(ROWS[2]);
+    get.addFamily(FAMILIES[1]);
+    get.addFamily(FAMILIES[2]);
+    get.setMaxVersions(Integer.MAX_VALUE);
+    result = ht.get(get);
+    assertTrue("Expected 1 key but received " + result.size(),
+        result.size() == 1);
+    assertNResult(result, ROWS[2], FAMILIES[2], QUALIFIER, 
+        new long [] {ts[2]},
+        new byte[][] {VALUES[2]},
+        0, 0);
+
+    scan = new Scan(ROWS[2]);
+    scan.addFamily(FAMILIES[1]);
+    scan.addFamily(FAMILIES[2]);
+    scan.setMaxVersions(Integer.MAX_VALUE);
+    result = getSingleScanResult(ht, scan);
+    assertTrue("Expected 1 key but received " + result.size(),
+        result.size() == 1);
+    assertNResult(result, ROWS[2], FAMILIES[2], QUALIFIER, 
+        new long [] {ts[2]},
+        new byte[][] {VALUES[2]},
+        0, 0);
+    
+    // Test if we delete the family first in one row (HBASE-1541)
+    
+    delete = new Delete(ROWS[3]);
+    delete.deleteFamily(FAMILIES[1]);
+    ht.delete(delete);
+    
+    put = new Put(ROWS[3]);
+    put.add(FAMILIES[2], QUALIFIER, VALUES[0]);
+    ht.put(put);
+    
+    put = new Put(ROWS[4]);
+    put.add(FAMILIES[1], QUALIFIER, VALUES[1]);
+    put.add(FAMILIES[2], QUALIFIER, VALUES[2]);
+    ht.put(put);
+    
+    get = new Get(ROWS[3]);
+    get.addFamily(FAMILIES[1]);
+    get.addFamily(FAMILIES[2]);
+    get.setMaxVersions(Integer.MAX_VALUE);
+    result = ht.get(get);
+    assertTrue("Expected 1 key but received " + result.size(),
+        result.size() == 1);
+    
+    get = new Get(ROWS[4]);
+    get.addFamily(FAMILIES[1]);
+    get.addFamily(FAMILIES[2]);
+    get.setMaxVersions(Integer.MAX_VALUE);
+    result = ht.get(get);
+    assertTrue("Expected 2 keys but received " + result.size(),
+        result.size() == 2);
+
+    scan = new Scan(ROWS[3]);
+    scan.addFamily(FAMILIES[1]);
+    scan.addFamily(FAMILIES[2]);
+    scan.setMaxVersions(Integer.MAX_VALUE);
+    ResultScanner scanner = ht.getScanner(scan);
+    result = scanner.next();
+    assertTrue("Expected 1 key but received " + result.size(),
+        result.size() == 1);
+    assertTrue(Bytes.equals(result.sorted()[0].getRow(), ROWS[3]));
+    assertTrue(Bytes.equals(result.sorted()[0].getValue(), VALUES[0]));
+    result = scanner.next();
+    assertTrue("Expected 2 keys but received " + result.size(),
+        result.size() == 2);
+    assertTrue(Bytes.equals(result.sorted()[0].getRow(), ROWS[4]));
+    assertTrue(Bytes.equals(result.sorted()[1].getRow(), ROWS[4]));
+    assertTrue(Bytes.equals(result.sorted()[0].getValue(), VALUES[1]));
+    assertTrue(Bytes.equals(result.sorted()[1].getValue(), VALUES[2]));
+    scanner.close();
+    
+    // Add test of bulk deleting.
+    for (int i = 0; i < 10; i++) {
+      byte [] bytes = Bytes.toBytes(i);
+      put = new Put(bytes);
+      put.add(FAMILIES[0], QUALIFIER, bytes);
+      ht.put(put);
+    }
+    for (int i = 0; i < 10; i++) {
+      byte [] bytes = Bytes.toBytes(i);
+      get = new Get(bytes);
+      get.addFamily(FAMILIES[0]);
+      result = ht.get(get);
+      assertTrue(result.size() == 1);
+    }
+    ArrayList<Delete> deletes = new ArrayList<Delete>();
+    for (int i = 0; i < 10; i++) {
+      byte [] bytes = Bytes.toBytes(i);
+      delete = new Delete(bytes);
+      delete.deleteFamily(FAMILIES[0]);
+      deletes.add(delete);
+    }
+    ht.delete(deletes);
+    for (int i = 0; i < 10; i++) {
+      byte [] bytes = Bytes.toBytes(i);
+      get = new Get(bytes);
+      get.addFamily(FAMILIES[0]);
+      result = ht.get(get);
+      assertTrue(result.size() == 0);
+    }
+  }
+
+  /**
+   * Baseline "scalability" test.
+   * 
+   * Tests one hundred families, one million columns, one million versions
+   */
+  @Ignore @Test
+  public void testMillions() throws Exception {
+    
+    // 100 families
+    
+    // millions of columns
+    
+    // millions of versions
+    
+  }
+
+  @Ignore @Test
+  public void testMultipleRegionsAndBatchPuts() throws Exception {
+    // Two family table
+    
+    // Insert lots of rows
+    
+    // Insert to the same row with batched puts
+    
+    // Insert to multiple rows with batched puts
+    
+    // Split the table
+    
+    // Get row from first region
+    
+    // Get row from second region
+    
+    // Scan all rows
+    
+    // Insert to multiple regions with batched puts
+    
+    // Get row from first region
+    
+    // Get row from second region
+    
+    // Scan all rows
+    
+    
+  }
+
+  @Ignore @Test
+  public void testMultipleRowMultipleFamily() throws Exception {
+    
+  }
+
+  //
+  // JIRA Testers
+  //
+  
+  /**
+   * HBASE-867
+   *    If millions of columns in a column family, hbase scanner won't come up
+   *    
+   *    Test will create numRows rows, each with numColsPerRow columns 
+   *    (1 version each), and attempt to scan them all.
+   *    
+   *    To test at scale, up numColsPerRow to the millions
+   *    (have not gotten that to work running as junit though)
+   */
+  @Test
+  public void testJiraTest867() throws Exception {
+    int numRows = 10;
+    int numColsPerRow = 2000;
+    
+    byte [] TABLE = Bytes.toBytes("testJiraTest867");
+    
+    byte [][] ROWS = makeN(ROW, numRows);
+    byte [][] QUALIFIERS = makeN(QUALIFIER, numColsPerRow);
+    
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILY);
+    
+    // Insert rows
+    
+    for(int i=0;i<numRows;i++) {
+      Put put = new Put(ROWS[i]);
+      for(int j=0;j<numColsPerRow;j++) {
+        put.add(FAMILY, QUALIFIERS[j], QUALIFIERS[j]);
+      }
+      assertTrue("Put expected to contain " + numColsPerRow + " columns but " +
+          "only contains " + put.size(), put.size() == numColsPerRow);
+      ht.put(put);
+    }
+    
+    // Get a row
+    Get get = new Get(ROWS[numRows-1]);
+    Result result = ht.get(get);
+    assertNumKeys(result, numColsPerRow);
+    KeyValue [] keys = result.sorted();
+    for(int i=0;i<result.size();i++) {
+      assertKey(keys[i], ROWS[numRows-1], FAMILY, QUALIFIERS[i], QUALIFIERS[i]);
+    }
+    
+    // Scan the rows
+    Scan scan = new Scan();
+    ResultScanner scanner = ht.getScanner(scan);
+    int rowCount = 0;
+    while((result = scanner.next()) != null) {
+      assertNumKeys(result, numColsPerRow);
+      KeyValue [] kvs = result.sorted();
+      for(int i=0;i<numColsPerRow;i++) {
+        assertKey(kvs[i], ROWS[rowCount], FAMILY, QUALIFIERS[i], QUALIFIERS[i]);
+      }
+      rowCount++;
+    }
+    scanner.close();
+    assertTrue("Expected to scan " + numRows + " rows but actually scanned "
+        + rowCount + " rows", rowCount == numRows);
+    
+    // flush and try again
+    
+    TEST_UTIL.flush();
+    
+    // Get a row
+    get = new Get(ROWS[numRows-1]);
+    result = ht.get(get);
+    assertNumKeys(result, numColsPerRow);
+    keys = result.sorted();
+    for(int i=0;i<result.size();i++) {
+      assertKey(keys[i], ROWS[numRows-1], FAMILY, QUALIFIERS[i], QUALIFIERS[i]);
+    }
+    
+    // Scan the rows
+    scan = new Scan();
+    scanner = ht.getScanner(scan);
+    rowCount = 0;
+    while((result = scanner.next()) != null) {
+      assertNumKeys(result, numColsPerRow);
+      KeyValue [] kvs = result.sorted();
+      for(int i=0;i<numColsPerRow;i++) {
+        assertKey(kvs[i], ROWS[rowCount], FAMILY, QUALIFIERS[i], QUALIFIERS[i]);
+      }
+      rowCount++;
+    }
+    scanner.close();
+    assertTrue("Expected to scan " + numRows + " rows but actually scanned "
+        + rowCount + " rows", rowCount == numRows);
+    
+  }
+  
+  /**
+   * HBASE-861
+   *    get with timestamp will return a value if there is a version with an 
+   *    earlier timestamp
+   */
+  @Test
+  public void testJiraTest861() throws Exception {
+    
+    byte [] TABLE = Bytes.toBytes("testJiraTest861");
+    byte [][] VALUES = makeNAscii(VALUE, 7);
+    long [] STAMPS = makeStamps(7);
+    
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILY, 10);
+    
+    // Insert three versions
+    
+    Put put = new Put(ROW);
+    put.add(FAMILY, QUALIFIER, STAMPS[3], VALUES[3]);
+    put.add(FAMILY, QUALIFIER, STAMPS[2], VALUES[2]);
+    put.add(FAMILY, QUALIFIER, STAMPS[4], VALUES[4]);
+    ht.put(put);
+    
+    // Get the middle value
+    getVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[2], VALUES[2]);
+    
+    // Try to get one version before (expect fail) 
+    getVersionAndVerifyMissing(ht, ROW, FAMILY, QUALIFIER, STAMPS[1]);
+    
+    // Try to get one version after (expect fail)
+    getVersionAndVerifyMissing(ht, ROW, FAMILY, QUALIFIER, STAMPS[5]);
+    
+    // Try same from storefile
+    TEST_UTIL.flush();
+    getVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[2], VALUES[2]);
+    getVersionAndVerifyMissing(ht, ROW, FAMILY, QUALIFIER, STAMPS[1]);
+    getVersionAndVerifyMissing(ht, ROW, FAMILY, QUALIFIER, STAMPS[5]);
+    
+    // Insert two more versions surrounding others, into memstore
+    put = new Put(ROW);
+    put.add(FAMILY, QUALIFIER, STAMPS[0], VALUES[0]);
+    put.add(FAMILY, QUALIFIER, STAMPS[6], VALUES[6]);
+    ht.put(put);
+    
+    // Check we can get everything we should and can't get what we shouldn't
+    getVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[0], VALUES[0]);
+    getVersionAndVerifyMissing(ht, ROW, FAMILY, QUALIFIER, STAMPS[1]);
+    getVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[2], VALUES[2]);
+    getVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[3], VALUES[3]);
+    getVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[4], VALUES[4]);
+    getVersionAndVerifyMissing(ht, ROW, FAMILY, QUALIFIER, STAMPS[5]);
+    getVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[6], VALUES[6]);
+    
+    // Try same from two storefiles
+    TEST_UTIL.flush();
+    getVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[0], VALUES[0]);
+    getVersionAndVerifyMissing(ht, ROW, FAMILY, QUALIFIER, STAMPS[1]);
+    getVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[2], VALUES[2]);
+    getVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[3], VALUES[3]);
+    getVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[4], VALUES[4]);
+    getVersionAndVerifyMissing(ht, ROW, FAMILY, QUALIFIER, STAMPS[5]);
+    getVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS[6], VALUES[6]);
+    
+  }
+  
+  /**
+   * HBASE-33
+   *    Add a HTable get/obtainScanner method that retrieves all versions of a 
+   *    particular column and row between two timestamps
+   */
+  @Test
+  public void testJiraTest33() throws Exception {
+
+    byte [] TABLE = Bytes.toBytes("testJiraTest33");
+    byte [][] VALUES = makeNAscii(VALUE, 7);
+    long [] STAMPS = makeStamps(7);
+    
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILY, 10);
+    
+    // Insert lots versions
+    
+    Put put = new Put(ROW);
+    put.add(FAMILY, QUALIFIER, STAMPS[0], VALUES[0]);
+    put.add(FAMILY, QUALIFIER, STAMPS[1], VALUES[1]);
+    put.add(FAMILY, QUALIFIER, STAMPS[2], VALUES[2]);
+    put.add(FAMILY, QUALIFIER, STAMPS[3], VALUES[3]);
+    put.add(FAMILY, QUALIFIER, STAMPS[4], VALUES[4]);
+    put.add(FAMILY, QUALIFIER, STAMPS[5], VALUES[5]);
+    ht.put(put);
+    
+    getVersionRangeAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 0, 5);
+    getVersionRangeAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 0, 2);
+    getVersionRangeAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 4, 5);
+    getVersionRangeAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 2, 3);
+    
+    scanVersionRangeAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 0, 5);
+    scanVersionRangeAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 0, 2);
+    scanVersionRangeAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 4, 5);
+    scanVersionRangeAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 2, 3);
+
+    // Try same from storefile
+    TEST_UTIL.flush();
+
+    getVersionRangeAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 0, 5);
+    getVersionRangeAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 0, 2);
+    getVersionRangeAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 4, 5);
+    getVersionRangeAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 2, 3);
+    
+    scanVersionRangeAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 0, 5);
+    scanVersionRangeAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 0, 2);
+    scanVersionRangeAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 4, 5);
+    scanVersionRangeAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 2, 3);
+    
+  }
+  
+  /**
+   * HBASE-1014
+   *    commit(BatchUpdate) method should return timestamp
+   */
+  @Test
+  public void testJiraTest1014() throws Exception {
+
+    byte [] TABLE = Bytes.toBytes("testJiraTest1014");
+    
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILY, 10);
+    
+    long manualStamp = 12345;
+    
+    // Insert lots versions
+    
+    Put put = new Put(ROW);
+    put.add(FAMILY, QUALIFIER, manualStamp, VALUE);
+    ht.put(put);
+
+    getVersionAndVerify(ht, ROW, FAMILY, QUALIFIER, manualStamp, VALUE);
+    getVersionAndVerifyMissing(ht, ROW, FAMILY, QUALIFIER, manualStamp-1);
+    getVersionAndVerifyMissing(ht, ROW, FAMILY, QUALIFIER, manualStamp+1);
+    
+  }
+  
+  /**
+   * HBASE-1182
+   *    Scan for columns > some timestamp 
+   */
+  @Test
+  public void testJiraTest1182() throws Exception {
+
+    byte [] TABLE = Bytes.toBytes("testJiraTest1182");
+    byte [][] VALUES = makeNAscii(VALUE, 7);
+    long [] STAMPS = makeStamps(7);
+    
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILY, 10);
+    
+    // Insert lots versions
+    
+    Put put = new Put(ROW);
+    put.add(FAMILY, QUALIFIER, STAMPS[0], VALUES[0]);
+    put.add(FAMILY, QUALIFIER, STAMPS[1], VALUES[1]);
+    put.add(FAMILY, QUALIFIER, STAMPS[2], VALUES[2]);
+    put.add(FAMILY, QUALIFIER, STAMPS[3], VALUES[3]);
+    put.add(FAMILY, QUALIFIER, STAMPS[4], VALUES[4]);
+    put.add(FAMILY, QUALIFIER, STAMPS[5], VALUES[5]);
+    ht.put(put);
+    
+    getVersionRangeAndVerifyGreaterThan(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 0, 5);
+    getVersionRangeAndVerifyGreaterThan(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 2, 5);
+    getVersionRangeAndVerifyGreaterThan(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 4, 5);
+    
+    scanVersionRangeAndVerifyGreaterThan(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 0, 5);
+    scanVersionRangeAndVerifyGreaterThan(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 2, 5);
+    scanVersionRangeAndVerifyGreaterThan(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 4, 5);
+    
+    // Try same from storefile
+    TEST_UTIL.flush();
+
+    getVersionRangeAndVerifyGreaterThan(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 0, 5);
+    getVersionRangeAndVerifyGreaterThan(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 2, 5);
+    getVersionRangeAndVerifyGreaterThan(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 4, 5);
+    
+    scanVersionRangeAndVerifyGreaterThan(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 0, 5);
+    scanVersionRangeAndVerifyGreaterThan(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 2, 5);
+    scanVersionRangeAndVerifyGreaterThan(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 4, 5);
+  }
+  
+  /**
+   * HBASE-52
+   *    Add a means of scanning over all versions
+   */
+  @Test
+  public void testJiraTest52() throws Exception {
+    byte [] TABLE = Bytes.toBytes("testJiraTest52");
+    byte [][] VALUES = makeNAscii(VALUE, 7);
+    long [] STAMPS = makeStamps(7);
+    
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILY, 10);
+    
+    // Insert lots versions
+    
+    Put put = new Put(ROW);
+    put.add(FAMILY, QUALIFIER, STAMPS[0], VALUES[0]);
+    put.add(FAMILY, QUALIFIER, STAMPS[1], VALUES[1]);
+    put.add(FAMILY, QUALIFIER, STAMPS[2], VALUES[2]);
+    put.add(FAMILY, QUALIFIER, STAMPS[3], VALUES[3]);
+    put.add(FAMILY, QUALIFIER, STAMPS[4], VALUES[4]);
+    put.add(FAMILY, QUALIFIER, STAMPS[5], VALUES[5]);
+    ht.put(put);
+    
+    getAllVersionsAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 0, 5);
+    
+    scanAllVersionsAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 0, 5);
+
+    // Try same from storefile
+    TEST_UTIL.flush();
+
+    getAllVersionsAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 0, 5);
+    
+    scanAllVersionsAndVerify(ht, ROW, FAMILY, QUALIFIER, STAMPS, VALUES, 0, 5);
+  }
+
+  //
+  // Bulk Testers
+  //
+  
+  private void getVersionRangeAndVerifyGreaterThan(HTable ht, byte [] row, 
+      byte [] family, byte [] qualifier, long [] stamps, byte [][] values, 
+      int start, int end)
+  throws IOException {
+    Get get = new Get(row);
+    get.addColumn(family, qualifier);
+    get.setMaxVersions(Integer.MAX_VALUE);
+    get.setTimeRange(stamps[start+1], Long.MAX_VALUE);
+    Result result = ht.get(get);
+    assertNResult(result, row, family, qualifier, stamps, values, start+1, end);
+  }
+  
+  private void getVersionRangeAndVerify(HTable ht, byte [] row, byte [] family,
+      byte [] qualifier, long [] stamps, byte [][] values, int start, int end)
+  throws IOException {
+    Get get = new Get(row);
+    get.addColumn(family, qualifier);
+    get.setMaxVersions(Integer.MAX_VALUE);
+    get.setTimeRange(stamps[start], stamps[end]+1);
+    Result result = ht.get(get);
+    assertNResult(result, row, family, qualifier, stamps, values, start, end);
+  }
+  
+  private void getAllVersionsAndVerify(HTable ht, byte [] row, byte [] family,
+      byte [] qualifier, long [] stamps, byte [][] values, int start, int end)
+  throws IOException {
+    Get get = new Get(row);
+    get.addColumn(family, qualifier);
+    get.setMaxVersions(Integer.MAX_VALUE);
+    Result result = ht.get(get);
+    assertNResult(result, row, family, qualifier, stamps, values, start, end);
+  }
+  
+  private void scanVersionRangeAndVerifyGreaterThan(HTable ht, byte [] row, 
+      byte [] family, byte [] qualifier, long [] stamps, byte [][] values, 
+      int start, int end)
+  throws IOException {
+    Scan scan = new Scan(row);
+    scan.addColumn(family, qualifier);
+    scan.setMaxVersions(Integer.MAX_VALUE);
+    scan.setTimeRange(stamps[start+1], Long.MAX_VALUE);
+    Result result = getSingleScanResult(ht, scan);
+    assertNResult(result, row, family, qualifier, stamps, values, start+1, end);
+  }
+  
+  private void scanVersionRangeAndVerify(HTable ht, byte [] row, byte [] family,
+      byte [] qualifier, long [] stamps, byte [][] values, int start, int end)
+  throws IOException {
+    Scan scan = new Scan(row);
+    scan.addColumn(family, qualifier);
+    scan.setMaxVersions(Integer.MAX_VALUE);
+    scan.setTimeRange(stamps[start], stamps[end]+1);
+    Result result = getSingleScanResult(ht, scan);
+    assertNResult(result, row, family, qualifier, stamps, values, start, end);
+  }
+
+  private void scanAllVersionsAndVerify(HTable ht, byte [] row, byte [] family,
+      byte [] qualifier, long [] stamps, byte [][] values, int start, int end)
+  throws IOException {
+    Scan scan = new Scan(row);
+    scan.addColumn(family, qualifier);
+    scan.setMaxVersions(Integer.MAX_VALUE);
+    Result result = getSingleScanResult(ht, scan);
+    assertNResult(result, row, family, qualifier, stamps, values, start, end);
+  }
+  
+  private void getVersionAndVerify(HTable ht, byte [] row, byte [] family,
+      byte [] qualifier, long stamp, byte [] value)
+  throws Exception {
+    Get get = new Get(row);
+    get.addColumn(family, qualifier);
+    get.setTimeStamp(stamp);
+    get.setMaxVersions(Integer.MAX_VALUE);
+    Result result = ht.get(get);
+    assertSingleResult(result, row, family, qualifier, stamp, value);
+  }
+  
+  private void getVersionAndVerifyMissing(HTable ht, byte [] row, byte [] family,
+      byte [] qualifier, long stamp)
+  throws Exception {
+    Get get = new Get(row);
+    get.addColumn(family, qualifier);
+    get.setTimeStamp(stamp);
+    get.setMaxVersions(Integer.MAX_VALUE);
+    Result result = ht.get(get);
+    assertEmptyResult(result);
+  }
+  
+  private void scanVersionAndVerify(HTable ht, byte [] row, byte [] family,
+      byte [] qualifier, long stamp, byte [] value)
+  throws Exception {
+    Scan scan = new Scan(row);
+    scan.addColumn(family, qualifier);
+    scan.setTimeStamp(stamp);
+    scan.setMaxVersions(Integer.MAX_VALUE);
+    Result result = getSingleScanResult(ht, scan);
+    assertSingleResult(result, row, family, qualifier, stamp, value);
+  }
+  
+  private void scanVersionAndVerifyMissing(HTable ht, byte [] row, 
+      byte [] family, byte [] qualifier, long stamp)
+  throws Exception {
+    Scan scan = new Scan(row);
+    scan.addColumn(family, qualifier);
+    scan.setTimeStamp(stamp);
+    scan.setMaxVersions(Integer.MAX_VALUE);
+    Result result = getSingleScanResult(ht, scan);
+    assertNullResult(result);
+  }
+  
+  private void getTestNull(HTable ht, byte [] row, byte [] family, 
+      byte [] value)
+  throws Exception {
+      
+    Get get = new Get(row);
+    get.addColumn(family, null);
+    Result result = ht.get(get);
+    assertSingleResult(result, row, family, null, value);
+    
+    get = new Get(row);
+    get.addColumn(family, HConstants.EMPTY_BYTE_ARRAY);
+    result = ht.get(get);
+    assertSingleResult(result, row, family, HConstants.EMPTY_BYTE_ARRAY, value);
+    
+    get = new Get(row);
+    get.addFamily(family);
+    result = ht.get(get);
+    assertSingleResult(result, row, family, HConstants.EMPTY_BYTE_ARRAY, value);
+    
+    get = new Get(row);
+    result = ht.get(get);
+    assertSingleResult(result, row, family, HConstants.EMPTY_BYTE_ARRAY, value);
+    
+  }
+  
+  private void scanTestNull(HTable ht, byte [] row, byte [] family, 
+      byte [] value)
+  throws Exception {
+    
+    Scan scan = new Scan();
+    scan.addColumn(family, null);
+    Result result = getSingleScanResult(ht, scan);
+    assertSingleResult(result, row, family, HConstants.EMPTY_BYTE_ARRAY, value);
+    
+    scan = new Scan();
+    scan.addColumn(family, HConstants.EMPTY_BYTE_ARRAY);
+    result = getSingleScanResult(ht, scan);
+    assertSingleResult(result, row, family, HConstants.EMPTY_BYTE_ARRAY, value);
+    
+    scan = new Scan();
+    scan.addFamily(family);
+    result = getSingleScanResult(ht, scan);
+    assertSingleResult(result, row, family, HConstants.EMPTY_BYTE_ARRAY, value);
+    
+    scan = new Scan();
+    result = getSingleScanResult(ht, scan);
+    assertSingleResult(result, row, family, HConstants.EMPTY_BYTE_ARRAY, value);
+    
+  }
+  
+  private void singleRowGetTest(HTable ht, byte [][] ROWS, byte [][] FAMILIES, 
+      byte [][] QUALIFIERS, byte [][] VALUES)
+  throws Exception {
+    
+    // Single column from memstore
+    Get get = new Get(ROWS[0]);
+    get.addColumn(FAMILIES[4], QUALIFIERS[0]);
+    Result result = ht.get(get);
+    assertSingleResult(result, ROWS[0], FAMILIES[4], QUALIFIERS[0], VALUES[0]);
+    
+    // Single column from storefile
+    get = new Get(ROWS[0]);
+    get.addColumn(FAMILIES[2], QUALIFIERS[2]);
+    result = ht.get(get);
+    assertSingleResult(result, ROWS[0], FAMILIES[2], QUALIFIERS[2], VALUES[2]);
+    
+    // Single column from storefile, family match
+    get = new Get(ROWS[0]);
+    get.addFamily(FAMILIES[7]);
+    result = ht.get(get);
+    assertSingleResult(result, ROWS[0], FAMILIES[7], QUALIFIERS[7], VALUES[7]);
+    
+    // Two columns, one from memstore one from storefile, same family,
+    // wildcard match
+    get = new Get(ROWS[0]);
+    get.addFamily(FAMILIES[4]);
+    result = ht.get(get);
+    assertDoubleResult(result, ROWS[0], FAMILIES[4], QUALIFIERS[0], VALUES[0],
+        FAMILIES[4], QUALIFIERS[4], VALUES[4]);
+    
+    // Two columns, one from memstore one from storefile, same family,
+    // explicit match
+    get = new Get(ROWS[0]);
+    get.addColumn(FAMILIES[4], QUALIFIERS[0]);
+    get.addColumn(FAMILIES[4], QUALIFIERS[4]);
+    result = ht.get(get);
+    assertDoubleResult(result, ROWS[0], FAMILIES[4], QUALIFIERS[0], VALUES[0],
+        FAMILIES[4], QUALIFIERS[4], VALUES[4]);
+  
+    // Three column, one from memstore two from storefile, different families,
+    // wildcard match
+    get = new Get(ROWS[0]);
+    get.addFamily(FAMILIES[4]);
+    get.addFamily(FAMILIES[7]);
+    result = ht.get(get);
+    assertNResult(result, ROWS[0], FAMILIES, QUALIFIERS, VALUES,
+        new int [][] { {4, 0, 0}, {4, 4, 4}, {7, 7, 7} });
+    
+    // Multiple columns from everywhere storefile, many family, wildcard
+    get = new Get(ROWS[0]);
+    get.addFamily(FAMILIES[2]);
+    get.addFamily(FAMILIES[4]);
+    get.addFamily(FAMILIES[6]);
+    get.addFamily(FAMILIES[7]);
+    result = ht.get(get);
+    assertNResult(result, ROWS[0], FAMILIES, QUALIFIERS, VALUES,
+        new int [][] { 
+          {2, 2, 2}, {2, 4, 4}, {4, 0, 0}, {4, 4, 4}, {6, 6, 6}, {6, 7, 7}, {7, 7, 7}
+    });
+    
+    // Multiple columns from everywhere storefile, many family, wildcard
+    get = new Get(ROWS[0]);
+    get.addColumn(FAMILIES[2], QUALIFIERS[2]);
+    get.addColumn(FAMILIES[2], QUALIFIERS[4]);
+    get.addColumn(FAMILIES[4], QUALIFIERS[0]);
+    get.addColumn(FAMILIES[4], QUALIFIERS[4]);
+    get.addColumn(FAMILIES[6], QUALIFIERS[6]);
+    get.addColumn(FAMILIES[6], QUALIFIERS[7]);
+    get.addColumn(FAMILIES[7], QUALIFIERS[7]);
+    get.addColumn(FAMILIES[7], QUALIFIERS[8]);
+    result = ht.get(get);
+    assertNResult(result, ROWS[0], FAMILIES, QUALIFIERS, VALUES,
+        new int [][] { 
+          {2, 2, 2}, {2, 4, 4}, {4, 0, 0}, {4, 4, 4}, {6, 6, 6}, {6, 7, 7}, {7, 7, 7}
+    });
+    
+    // Everything
+    get = new Get(ROWS[0]);
+    result = ht.get(get);
+    assertNResult(result, ROWS[0], FAMILIES, QUALIFIERS, VALUES,
+        new int [][] { 
+          {2, 2, 2}, {2, 4, 4}, {4, 0, 0}, {4, 4, 4}, {6, 6, 6}, {6, 7, 7}, {7, 7, 7}, {9, 0, 0}
+    });
+    
+    // Get around inserted columns
+    
+    get = new Get(ROWS[1]);
+    result = ht.get(get);
+    assertEmptyResult(result);
+    
+    get = new Get(ROWS[0]);
+    get.addColumn(FAMILIES[4], QUALIFIERS[3]);
+    get.addColumn(FAMILIES[2], QUALIFIERS[3]);
+    result = ht.get(get);
+    assertEmptyResult(result);
+    
+  }
+  
+  private void singleRowScanTest(HTable ht, byte [][] ROWS, byte [][] FAMILIES, 
+      byte [][] QUALIFIERS, byte [][] VALUES)
+  throws Exception {
+  
+    // Single column from memstore
+    Scan scan = new Scan();
+    scan.addColumn(FAMILIES[4], QUALIFIERS[0]);
+    Result result = getSingleScanResult(ht, scan);
+    assertSingleResult(result, ROWS[0], FAMILIES[4], QUALIFIERS[0], VALUES[0]);
+    
+    // Single column from storefile
+    scan = new Scan();
+    scan.addColumn(FAMILIES[2], QUALIFIERS[2]);
+    result = getSingleScanResult(ht, scan);
+    assertSingleResult(result, ROWS[0], FAMILIES[2], QUALIFIERS[2], VALUES[2]);
+    
+    // Single column from storefile, family match
+    scan = new Scan();
+    scan.addFamily(FAMILIES[7]);
+    result = getSingleScanResult(ht, scan);
+    assertSingleResult(result, ROWS[0], FAMILIES[7], QUALIFIERS[7], VALUES[7]);
+    
+    // Two columns, one from memstore one from storefile, same family,
+    // wildcard match
+    scan = new Scan();
+    scan.addFamily(FAMILIES[4]);
+    result = getSingleScanResult(ht, scan);
+    assertDoubleResult(result, ROWS[0], FAMILIES[4], QUALIFIERS[0], VALUES[0],
+        FAMILIES[4], QUALIFIERS[4], VALUES[4]);
+    
+    // Two columns, one from memstore one from storefile, same family,
+    // explicit match
+    scan = new Scan();
+    scan.addColumn(FAMILIES[4], QUALIFIERS[0]);
+    scan.addColumn(FAMILIES[4], QUALIFIERS[4]);
+    result = getSingleScanResult(ht, scan);
+    assertDoubleResult(result, ROWS[0], FAMILIES[4], QUALIFIERS[0], VALUES[0],
+        FAMILIES[4], QUALIFIERS[4], VALUES[4]);
+  
+    // Three column, one from memstore two from storefile, different families,
+    // wildcard match
+    scan = new Scan();
+    scan.addFamily(FAMILIES[4]);
+    scan.addFamily(FAMILIES[7]);
+    result = getSingleScanResult(ht, scan);
+    assertNResult(result, ROWS[0], FAMILIES, QUALIFIERS, VALUES,
+        new int [][] { {4, 0, 0}, {4, 4, 4}, {7, 7, 7} });
+    
+    // Multiple columns from everywhere storefile, many family, wildcard
+    scan = new Scan();
+    scan.addFamily(FAMILIES[2]);
+    scan.addFamily(FAMILIES[4]);
+    scan.addFamily(FAMILIES[6]);
+    scan.addFamily(FAMILIES[7]);
+    result = getSingleScanResult(ht, scan);
+    assertNResult(result, ROWS[0], FAMILIES, QUALIFIERS, VALUES,
+        new int [][] { 
+          {2, 2, 2}, {2, 4, 4}, {4, 0, 0}, {4, 4, 4}, {6, 6, 6}, {6, 7, 7}, {7, 7, 7}
+    });
+    
+    // Multiple columns from everywhere storefile, many family, wildcard
+    scan = new Scan();
+    scan.addColumn(FAMILIES[2], QUALIFIERS[2]);
+    scan.addColumn(FAMILIES[2], QUALIFIERS[4]);
+    scan.addColumn(FAMILIES[4], QUALIFIERS[0]);
+    scan.addColumn(FAMILIES[4], QUALIFIERS[4]);
+    scan.addColumn(FAMILIES[6], QUALIFIERS[6]);
+    scan.addColumn(FAMILIES[6], QUALIFIERS[7]);
+    scan.addColumn(FAMILIES[7], QUALIFIERS[7]);
+    scan.addColumn(FAMILIES[7], QUALIFIERS[8]);
+    result = getSingleScanResult(ht, scan);
+    assertNResult(result, ROWS[0], FAMILIES, QUALIFIERS, VALUES,
+        new int [][] { 
+          {2, 2, 2}, {2, 4, 4}, {4, 0, 0}, {4, 4, 4}, {6, 6, 6}, {6, 7, 7}, {7, 7, 7}
+    });
+    
+    // Everything
+    scan = new Scan();
+    result = getSingleScanResult(ht, scan);
+    assertNResult(result, ROWS[0], FAMILIES, QUALIFIERS, VALUES,
+        new int [][] { 
+          {2, 2, 2}, {2, 4, 4}, {4, 0, 0}, {4, 4, 4}, {6, 6, 6}, {6, 7, 7}, {7, 7, 7}, {9, 0, 0}
+    });
+    
+    // Scan around inserted columns
+    
+    scan = new Scan(ROWS[1]);
+    result = getSingleScanResult(ht, scan);
+    assertNullResult(result);
+    
+    scan = new Scan();
+    scan.addColumn(FAMILIES[4], QUALIFIERS[3]);
+    scan.addColumn(FAMILIES[2], QUALIFIERS[3]);
+    result = getSingleScanResult(ht, scan);
+    assertNullResult(result);
+  }
+
+  /**
+   * Verify a single column using gets.
+   * Expects family and qualifier arrays to be valid for at least 
+   * the range:  idx-2 < idx < idx+2
+   */
+  private void getVerifySingleColumn(HTable ht,

[... 1092 lines stripped ...]