You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/09/08 08:10:04 UTC

[5/7] incubator-tephra git commit: TEPHRA-176, TEPHRA-177: Adding maven modules for CDH-5.7, 5.8 support, HBase-1.1 and HBase-1.2 modules

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2246abff/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
new file mode 100644
index 0000000..de1fa6b
--- /dev/null
+++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
@@ -0,0 +1,1606 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tephra.hbase;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.OperationWithAttributes;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.LongComparator;
+import org.apache.hadoop.hbase.filter.ValueFilter;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionConflictException;
+import org.apache.tephra.TransactionContext;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TransactionSystemClient;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.tephra.inmemory.InMemoryTxSystemClient;
+import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.persist.InMemoryTransactionStateStorage;
+import org.apache.tephra.persist.TransactionStateStorage;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for TransactionAwareHTables.
+ */
+public class TransactionAwareHTableTest {
+  private static final Logger LOG = LoggerFactory.getLogger(TransactionAwareHTableTest.class);
+
+  private static HBaseTestingUtility testUtil;
+  private static HBaseAdmin hBaseAdmin;
+  private static TransactionStateStorage txStateStorage;
+  private static TransactionManager txManager;
+  private static Configuration conf;
+  private TransactionContext transactionContext;
+  private TransactionAwareHTable transactionAwareHTable;
+  private HTable hTable;
+
+  private static final class TestBytes {
+    private static final byte[] table = Bytes.toBytes("testtable");
+    private static final byte[] family = Bytes.toBytes("f1");
+    private static final byte[] family2 = Bytes.toBytes("f2");
+    private static final byte[] qualifier = Bytes.toBytes("col1");
+    private static final byte[] qualifier2 = Bytes.toBytes("col2");
+    private static final byte[] row = Bytes.toBytes("row");
+    private static final byte[] row2 = Bytes.toBytes("row2");
+    private static final byte[] row3 = Bytes.toBytes("row3");
+    private static final byte[] row4 = Bytes.toBytes("row4");
+    private static final byte[] value = Bytes.toBytes("value");
+    private static final byte[] value2 = Bytes.toBytes("value2");
+    private static final byte[] value3 = Bytes.toBytes("value3");
+  }
+  
+  private static final String TEST_ATTRIBUTE = "TEST_ATTRIBUTE";
+
+  public static class TestRegionObserver extends BaseRegionObserver {
+    @Override
+    public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
+                       final Put put, final WALEdit edit,
+                       final Durability durability) throws IOException {
+      if (put.getAttribute(TEST_ATTRIBUTE) == null) {
+        throw new DoNotRetryIOException("Put should preserve attributes");
+      }
+      if (put.getDurability() != Durability.USE_DEFAULT) {
+        throw new DoNotRetryIOException("Durability is not propagated correctly");
+      }
+    }
+
+    @Override
+    public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
+                          final Delete delete, final WALEdit edit,
+                          final Durability durability) throws IOException {
+      if (delete.getAttribute(TEST_ATTRIBUTE) == null) {
+        throw new DoNotRetryIOException("Delete should preserve attributes");
+      }
+      if (delete.getDurability() != Durability.USE_DEFAULT) {
+        throw new DoNotRetryIOException("Durability is not propagated correctly");
+      }
+    }
+  }
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    testUtil = new HBaseTestingUtility();
+    conf = testUtil.getConfiguration();
+
+    // Tune down the connection thread pool size
+    conf.setInt("hbase.hconnection.threads.core", 5);
+    conf.setInt("hbase.hconnection.threads.max", 10);
+    // Tunn down handler threads in regionserver
+    conf.setInt("hbase.regionserver.handler.count", 10);
+
+    // Set to random port
+    conf.setInt("hbase.master.port", 0);
+    conf.setInt("hbase.master.info.port", 0);
+    conf.setInt("hbase.regionserver.port", 0);
+    conf.setInt("hbase.regionserver.info.port", 0);
+
+    testUtil.startMiniCluster();
+    hBaseAdmin = testUtil.getHBaseAdmin();
+    txStateStorage = new InMemoryTransactionStateStorage();
+    txManager = new TransactionManager(conf, txStateStorage, new TxMetricsCollector());
+    txManager.startAndWait();
+  }
+
+  @AfterClass
+  public static void shutdownAfterClass() throws Exception {
+    testUtil.shutdownMiniCluster();
+    hBaseAdmin.close();
+  }
+
+  @Before
+  public void setupBeforeTest() throws Exception {
+    hTable = createTable(TestBytes.table, new byte[][]{TestBytes.family});
+    transactionAwareHTable = new TransactionAwareHTable(hTable);
+    transactionContext = new TransactionContext(new InMemoryTxSystemClient(txManager), transactionAwareHTable);
+  }
+
+  @After
+  public void shutdownAfterTest() throws IOException {
+    hBaseAdmin.disableTable(TestBytes.table);
+    hBaseAdmin.deleteTable(TestBytes.table);
+  }
+
+  private HTable createTable(byte[] tableName, byte[][] columnFamilies) throws Exception {
+    return createTable(tableName, columnFamilies, false, Collections.<String>emptyList());
+  }
+
+  private HTable createTable(byte[] tableName, byte[][] columnFamilies, boolean existingData, 
+    List<String> coprocessors) throws Exception {
+    HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
+    for (byte[] family : columnFamilies) {
+      HColumnDescriptor columnDesc = new HColumnDescriptor(family);
+      columnDesc.setMaxVersions(Integer.MAX_VALUE);
+      columnDesc.setValue(TxConstants.PROPERTY_TTL, String.valueOf(100000)); // in millis
+      desc.addFamily(columnDesc);
+    }
+    if (existingData) {
+      desc.setValue(TxConstants.READ_NON_TX_DATA, "true");
+    }
+    // Divide individually to prevent any overflow
+    int priority  = Coprocessor.PRIORITY_USER; 
+    desc.addCoprocessor(TransactionProcessor.class.getName(), null, priority, null);
+    // order in list is the same order that coprocessors will be invoked  
+    for (String coprocessor : coprocessors) {
+      desc.addCoprocessor(coprocessor, null, ++priority, null);
+    }
+    hBaseAdmin.createTable(desc);
+    testUtil.waitTableAvailable(tableName, 5000);
+    return new HTable(testUtil.getConfiguration(), tableName);
+   }
+
+  /**
+   * Test transactional put and get requests.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testValidTransactionalPutAndGet() throws Exception {
+    transactionContext.start();
+    Put put = new Put(TestBytes.row);
+    put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
+    transactionAwareHTable.put(put);
+    transactionContext.finish();
+
+    transactionContext.start();
+    Result result = transactionAwareHTable.get(new Get(TestBytes.row));
+    transactionContext.finish();
+
+    byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
+    assertArrayEquals(TestBytes.value, value);
+  }
+
+  /**
+   * Test aborted put requests, that must be rolled back.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testAbortedTransactionPutAndGet() throws Exception {
+    transactionContext.start();
+    Put put = new Put(TestBytes.row);
+    put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
+    transactionAwareHTable.put(put);
+
+    transactionContext.abort();
+
+    transactionContext.start();
+    Result result = transactionAwareHTable.get(new Get(TestBytes.row));
+    transactionContext.finish();
+    byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
+    assertArrayEquals(value, null);
+  }
+
+  /**
+   * Test transactional delete operations.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testValidTransactionalDelete() throws Exception {
+    try (HTable hTable = createTable(Bytes.toBytes("TestValidTransactionalDelete"),
+                                     new byte[][]{TestBytes.family, TestBytes.family2})) {
+      TransactionAwareHTable txTable = new TransactionAwareHTable(hTable);
+      TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
+
+      txContext.start();
+      Put put = new Put(TestBytes.row);
+      put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
+      put.add(TestBytes.family2, TestBytes.qualifier, TestBytes.value2);
+      txTable.put(put);
+      txContext.finish();
+
+      txContext.start();
+      Result result = txTable.get(new Get(TestBytes.row));
+      txContext.finish();
+      byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
+      assertArrayEquals(TestBytes.value, value);
+      value = result.getValue(TestBytes.family2, TestBytes.qualifier);
+      assertArrayEquals(TestBytes.value2, value);
+
+      // test full row delete
+      txContext.start();
+      Delete delete = new Delete(TestBytes.row);
+      txTable.delete(delete);
+      txContext.finish();
+
+      txContext.start();
+      result = txTable.get(new Get(TestBytes.row));
+      txContext.finish();
+      assertTrue(result.isEmpty());
+
+      // test column delete
+      // load 10 rows
+      txContext.start();
+      int rowCount = 10;
+      for (int i = 0; i < rowCount; i++) {
+        Put p = new Put(Bytes.toBytes("row" + i));
+        for (int j = 0; j < 10; j++) {
+          p.add(TestBytes.family, Bytes.toBytes(j), TestBytes.value);
+        }
+        txTable.put(p);
+      }
+      txContext.finish();
+
+      // verify loaded rows
+      txContext.start();
+      for (int i = 0; i < rowCount; i++) {
+        Get g = new Get(Bytes.toBytes("row" + i));
+        Result r = txTable.get(g);
+        assertFalse(r.isEmpty());
+        for (int j = 0; j < 10; j++) {
+          assertArrayEquals(TestBytes.value, r.getValue(TestBytes.family, Bytes.toBytes(j)));
+        }
+      }
+      txContext.finish();
+
+      // delete odds columns from odd rows and even columns from even rows
+      txContext.start();
+      for (int i = 0; i < rowCount; i++) {
+        Delete d = new Delete(Bytes.toBytes("row" + i));
+        for (int j = 0; j < 10; j++) {
+          if (i % 2 == j % 2) {
+            d.deleteColumns(TestBytes.family, Bytes.toBytes(j));
+          }
+        }
+        txTable.delete(d);
+      }
+      txContext.finish();
+
+      // verify deleted columns
+      txContext.start();
+      for (int i = 0; i < rowCount; i++) {
+        Get g = new Get(Bytes.toBytes("row" + i));
+        Result r = txTable.get(g);
+        assertEquals(5, r.size());
+        for (Map.Entry<byte[], byte[]> entry : r.getFamilyMap(TestBytes.family).entrySet()) {
+          int col = Bytes.toInt(entry.getKey());
+          // each row should only have the opposite mod (odd=even, even=odd)
+          assertNotEquals(i % 2, col % 2);
+          assertArrayEquals(TestBytes.value, entry.getValue());
+        }
+      }
+      txContext.finish();
+
+      // test family delete
+      // load 10 rows
+      txContext.start();
+      for (int i = 0; i < rowCount; i++) {
+        Put p = new Put(Bytes.toBytes("famrow" + i));
+        p.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
+        p.add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value2);
+        txTable.put(p);
+      }
+      txContext.finish();
+
+      // verify all loaded rows
+      txContext.start();
+      for (int i = 0; i < rowCount; i++) {
+        Get g = new Get(Bytes.toBytes("famrow" + i));
+        Result r = txTable.get(g);
+        assertEquals(2, r.size());
+        assertArrayEquals(TestBytes.value, r.getValue(TestBytes.family, TestBytes.qualifier));
+        assertArrayEquals(TestBytes.value2, r.getValue(TestBytes.family2, TestBytes.qualifier2));
+      }
+      txContext.finish();
+
+      // delete family1 for even rows, family2 for odd rows
+      txContext.start();
+      for (int i = 0; i < rowCount; i++) {
+        Delete d = new Delete(Bytes.toBytes("famrow" + i));
+        d.deleteFamily((i % 2 == 0) ? TestBytes.family : TestBytes.family2);
+        txTable.delete(d);
+      }
+      txContext.finish();
+
+      // verify deleted families
+      txContext.start();
+      for (int i = 0; i < rowCount; i++) {
+        Get g = new Get(Bytes.toBytes("famrow" + i));
+        Result r = txTable.get(g);
+        assertEquals(1, r.size());
+        if (i % 2 == 0) {
+          assertNull(r.getValue(TestBytes.family, TestBytes.qualifier));
+          assertArrayEquals(TestBytes.value2, r.getValue(TestBytes.family2, TestBytes.qualifier2));
+        } else {
+          assertArrayEquals(TestBytes.value, r.getValue(TestBytes.family, TestBytes.qualifier));
+          assertNull(r.getValue(TestBytes.family2, TestBytes.qualifier2));
+        }
+      }
+      txContext.finish();
+    }
+  }
+
+  /**
+   * Test that put and delete attributes are preserved
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testAttributesPreserved() throws Exception {
+    HTable hTable = createTable(Bytes.toBytes("TestAttributesPreserved"),
+        new byte[][]{TestBytes.family, TestBytes.family2}, false,
+        Lists.newArrayList(TestRegionObserver.class.getName()));
+    try {
+      TransactionAwareHTable txTable = new TransactionAwareHTable(hTable);
+      TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
+
+      txContext.start();
+      Put put = new Put(TestBytes.row);
+      put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
+      put.add(TestBytes.family2, TestBytes.qualifier, TestBytes.value2);
+      // set an attribute on the put, TestRegionObserver will verify it still exists
+      put.setAttribute(TEST_ATTRIBUTE, new byte[]{});
+      txTable.put(put);
+      txContext.finish();
+
+      txContext.start();
+      Result result = txTable.get(new Get(TestBytes.row));
+      txContext.finish();
+      byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
+      assertArrayEquals(TestBytes.value, value);
+      value = result.getValue(TestBytes.family2, TestBytes.qualifier);
+      assertArrayEquals(TestBytes.value2, value);
+
+      // test full row delete, TestRegionObserver will verify it still exists
+      txContext.start();
+      Delete delete = new Delete(TestBytes.row);
+      delete.setAttribute(TEST_ATTRIBUTE, new byte[]{});
+      txTable.delete(delete);
+      txContext.finish();
+
+      txContext.start();
+      result = txTable.get(new Get(TestBytes.row));
+      txContext.finish();
+      assertTrue(result.isEmpty());
+    } finally {
+        hTable.close();
+      }
+    }
+  
+  /**
+   * Test aborted transactional delete requests, that must be rolled back.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testAbortedTransactionalDelete() throws Exception {
+    transactionContext.start();
+    Put put = new Put(TestBytes.row);
+    put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
+    transactionAwareHTable.put(put);
+    transactionContext.finish();
+
+    transactionContext.start();
+    Result result = transactionAwareHTable.get(new Get(TestBytes.row));
+    transactionContext.finish();
+    byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
+    assertArrayEquals(TestBytes.value, value);
+
+    transactionContext.start();
+    Delete delete = new Delete(TestBytes.row);
+    transactionAwareHTable.delete(delete);
+    transactionContext.abort();
+
+    transactionContext.start();
+    result = transactionAwareHTable.get(new Get(TestBytes.row));
+    transactionContext.finish();
+    value = result.getValue(TestBytes.family, TestBytes.qualifier);
+    assertArrayEquals(TestBytes.value, value);
+  }
+
+  private void testDeleteRollback(TxConstants.ConflictDetection conflictDetection) throws Exception {
+    String tableName = String.format("%s%s", "TestColFamilyDelete", conflictDetection);
+    HTable hTable = createTable(Bytes.toBytes(tableName), new byte[][]{TestBytes.family});
+    try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, conflictDetection)) {
+      TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
+      txContext.start();
+      txTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
+      txContext.finish();
+
+      // Start a tx, delete the row and then abort the tx
+      txContext.start();
+      txTable.delete(new Delete(TestBytes.row));
+      txContext.abort();
+
+      // Start a tx, delete a column family and then abort the tx
+      txContext.start();
+      txTable.delete(new Delete(TestBytes.row).deleteFamily(TestBytes.family));
+      txContext.abort();
+
+      // Above operations should have no effect on the row, since they were aborted
+      txContext.start();
+      Get get = new Get(TestBytes.row);
+      Result result = txTable.get(get);
+      assertFalse(result.isEmpty());
+      assertArrayEquals(TestBytes.value, result.getValue(TestBytes.family, TestBytes.qualifier));
+      txContext.finish();
+    }
+  }
+
+  @Test
+  public void testDeleteRollback() throws Exception {
+    testDeleteRollback(TxConstants.ConflictDetection.ROW);
+    testDeleteRollback(TxConstants.ConflictDetection.COLUMN);
+    testDeleteRollback(TxConstants.ConflictDetection.NONE);
+  }
+
+  @Test
+  public void testMultiColumnFamilyRowDeleteRollback() throws Exception {
+    HTable hTable = createTable(Bytes.toBytes("TestMultColFam"), new byte[][] {TestBytes.family, TestBytes.family2});
+    try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, TxConstants.ConflictDetection.ROW)) {
+      TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
+      txContext.start();
+      txTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
+      txContext.finish();
+
+      txContext.start();
+      //noinspection ConstantConditions
+      txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_ALL);
+      Result result = txTable.get(new Get(TestBytes.row));
+      Assert.assertEquals(1, result.getFamilyMap(TestBytes.family).size());
+      Assert.assertEquals(0, result.getFamilyMap(TestBytes.family2).size());
+      txContext.finish();
+
+      //Start a tx, delete the row and then abort the tx
+      txContext.start();
+      txTable.delete(new Delete(TestBytes.row));
+      txContext.abort();
+
+      //Start a tx and scan all the col families to make sure none of them have delete markers
+      txContext.start();
+      txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_ALL);
+      result = txTable.get(new Get(TestBytes.row));
+      Assert.assertEquals(1, result.getFamilyMap(TestBytes.family).size());
+      Assert.assertEquals(0, result.getFamilyMap(TestBytes.family2).size());
+      txContext.finish();
+    }
+  }
+
+  @Test
+  public void testRowDelete() throws Exception {
+    HTable hTable = createTable(Bytes.toBytes("TestRowDelete"), new byte[][]{TestBytes.family, TestBytes.family2});
+    try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, TxConstants.ConflictDetection.ROW)) {
+      TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
+
+      // Test 1: full row delete
+      txContext.start();
+      txTable.put(new Put(TestBytes.row)
+                    .add(TestBytes.family, TestBytes.qualifier, TestBytes.value)
+                    .add(TestBytes.family, TestBytes.qualifier2, TestBytes.value2)
+                    .add(TestBytes.family2, TestBytes.qualifier, TestBytes.value)
+                    .add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value2));
+      txContext.finish();
+
+      txContext.start();
+      Get get = new Get(TestBytes.row);
+      Result result = txTable.get(get);
+      assertFalse(result.isEmpty());
+      assertArrayEquals(TestBytes.value, result.getValue(TestBytes.family, TestBytes.qualifier));
+      assertArrayEquals(TestBytes.value2, result.getValue(TestBytes.family, TestBytes.qualifier2));
+      assertArrayEquals(TestBytes.value, result.getValue(TestBytes.family2, TestBytes.qualifier));
+      assertArrayEquals(TestBytes.value2, result.getValue(TestBytes.family2, TestBytes.qualifier2));
+      txContext.finish();
+
+      // delete entire row
+      txContext.start();
+      txTable.delete(new Delete(TestBytes.row));
+      txContext.finish();
+
+      // verify row is now empty
+      txContext.start();
+      result = txTable.get(new Get(TestBytes.row));
+      assertTrue(result.isEmpty());
+
+      // verify row is empty for explicit column retrieval
+      result = txTable.get(new Get(TestBytes.row)
+                             .addColumn(TestBytes.family, TestBytes.qualifier)
+                             .addFamily(TestBytes.family2));
+      assertTrue(result.isEmpty());
+
+      // verify row is empty for scan
+      ResultScanner scanner = txTable.getScanner(new Scan(TestBytes.row));
+      assertNull(scanner.next());
+      scanner.close();
+
+      // verify row is empty for scan with explicit column
+      scanner = txTable.getScanner(new Scan(TestBytes.row).addColumn(TestBytes.family2, TestBytes.qualifier2));
+      assertNull(scanner.next());
+      scanner.close();
+      txContext.finish();
+
+      // write swapped values to one column per family
+      txContext.start();
+      txTable.put(new Put(TestBytes.row)
+                    .add(TestBytes.family, TestBytes.qualifier, TestBytes.value2)
+                    .add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value));
+      txContext.finish();
+
+      // verify new values appear
+      txContext.start();
+      result = txTable.get(new Get(TestBytes.row));
+      assertFalse(result.isEmpty());
+      assertEquals(2, result.size());
+      assertArrayEquals(TestBytes.value2, result.getValue(TestBytes.family, TestBytes.qualifier));
+      assertArrayEquals(TestBytes.value, result.getValue(TestBytes.family2, TestBytes.qualifier2));
+
+      scanner = txTable.getScanner(new Scan(TestBytes.row));
+      Result result1 = scanner.next();
+      assertNotNull(result1);
+      assertFalse(result1.isEmpty());
+      assertEquals(2, result1.size());
+      assertArrayEquals(TestBytes.value2, result.getValue(TestBytes.family, TestBytes.qualifier));
+      assertArrayEquals(TestBytes.value, result.getValue(TestBytes.family2, TestBytes.qualifier2));
+      scanner.close();
+      txContext.finish();
+
+      // Test 2: delete of first column family
+      txContext.start();
+      txTable.put(new Put(TestBytes.row2)
+                    .add(TestBytes.family, TestBytes.qualifier, TestBytes.value)
+                    .add(TestBytes.family, TestBytes.qualifier2, TestBytes.value2)
+                    .add(TestBytes.family2, TestBytes.qualifier, TestBytes.value)
+                    .add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value2));
+      txContext.finish();
+
+      txContext.start();
+      txTable.delete(new Delete(TestBytes.row2).deleteFamily(TestBytes.family));
+      txContext.finish();
+
+      txContext.start();
+      Result fam1Result = txTable.get(new Get(TestBytes.row2));
+      assertFalse(fam1Result.isEmpty());
+      assertEquals(2, fam1Result.size());
+      assertArrayEquals(TestBytes.value, fam1Result.getValue(TestBytes.family2, TestBytes.qualifier));
+      assertArrayEquals(TestBytes.value2, fam1Result.getValue(TestBytes.family2, TestBytes.qualifier2));
+      txContext.finish();
+
+      // Test 3: delete of second column family
+      txContext.start();
+      txTable.put(new Put(TestBytes.row3)
+                    .add(TestBytes.family, TestBytes.qualifier, TestBytes.value)
+                    .add(TestBytes.family, TestBytes.qualifier2, TestBytes.value2)
+                    .add(TestBytes.family2, TestBytes.qualifier, TestBytes.value)
+                    .add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value2));
+      txContext.finish();
+
+      txContext.start();
+      txTable.delete(new Delete(TestBytes.row3).deleteFamily(TestBytes.family2));
+      txContext.finish();
+
+      txContext.start();
+      Result fam2Result = txTable.get(new Get(TestBytes.row3));
+      assertFalse(fam2Result.isEmpty());
+      assertEquals(2, fam2Result.size());
+      assertArrayEquals(TestBytes.value, fam2Result.getValue(TestBytes.family, TestBytes.qualifier));
+      assertArrayEquals(TestBytes.value2, fam2Result.getValue(TestBytes.family, TestBytes.qualifier2));
+      txContext.finish();
+
+      // Test 4: delete specific rows in a range
+      txContext.start();
+      for (int i = 0; i < 10; i++) {
+        txTable.put(new Put(Bytes.toBytes("z" + i))
+                      .add(TestBytes.family, TestBytes.qualifier, Bytes.toBytes(i))
+                      .add(TestBytes.family2, TestBytes.qualifier2, Bytes.toBytes(i)));
+      }
+      txContext.finish();
+
+      txContext.start();
+      // delete odd rows
+      for (int i = 1; i < 10; i += 2) {
+        txTable.delete(new Delete(Bytes.toBytes("z" + i)));
+      }
+      txContext.finish();
+
+      txContext.start();
+      int cnt = 0;
+      ResultScanner zScanner = txTable.getScanner(new Scan(Bytes.toBytes("z0")));
+      Result res;
+      while ((res = zScanner.next()) != null) {
+        assertFalse(res.isEmpty());
+        assertArrayEquals(Bytes.toBytes("z" + cnt), res.getRow());
+        assertArrayEquals(Bytes.toBytes(cnt), res.getValue(TestBytes.family, TestBytes.qualifier));
+        assertArrayEquals(Bytes.toBytes(cnt), res.getValue(TestBytes.family2, TestBytes.qualifier2));
+        cnt += 2;
+      }
+
+      // Test 5: delete prior writes in the same transaction
+      txContext.start();
+      txTable.put(new Put(TestBytes.row4)
+                    .add(TestBytes.family, TestBytes.qualifier, TestBytes.value)
+                    .add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value2));
+      txTable.delete(new Delete(TestBytes.row4));
+      txContext.finish();
+
+      txContext.start();
+      Result row4Result = txTable.get(new Get(TestBytes.row4));
+      assertTrue(row4Result.isEmpty());
+      txContext.finish();
+    }
+  }
+
+  /**
+   * Expect an exception since a transaction hasn't been started.
+   *
+   * @throws Exception
+   */
+  @Test(expected = IOException.class)
+  public void testTransactionlessFailure() throws Exception {
+    transactionAwareHTable.get(new Get(TestBytes.row));
+  }
+
+  /**
+   * Tests that each transaction can see its own persisted writes, while not seeing writes from other
+   * in-progress transactions.
+   */
+  @Test
+  public void testReadYourWrites() throws Exception {
+    // In-progress tx1: started before our main transaction
+    HTable hTable1 = new HTable(testUtil.getConfiguration(), TestBytes.table);
+    TransactionAwareHTable txHTable1 = new TransactionAwareHTable(hTable1);
+    TransactionContext inprogressTxContext1 = new TransactionContext(new InMemoryTxSystemClient(txManager), txHTable1);
+
+    // In-progress tx2: started while our main transaction is running
+    HTable hTable2 = new HTable(testUtil.getConfiguration(), TestBytes.table);
+    TransactionAwareHTable txHTable2 = new TransactionAwareHTable(hTable2);
+    TransactionContext inprogressTxContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), txHTable2);
+
+    // create an in-progress write that should be ignored
+    byte[] col2 = Bytes.toBytes("col2");
+    inprogressTxContext1.start();
+    Put putCol2 = new Put(TestBytes.row);
+    byte[] valueCol2 = Bytes.toBytes("writing in progress");
+    putCol2.add(TestBytes.family, col2, valueCol2);
+    txHTable1.put(putCol2);
+
+    // start a tx and write a value to test reading in same tx
+    transactionContext.start();
+    Put put = new Put(TestBytes.row);
+    byte[] value = Bytes.toBytes("writing");
+    put.add(TestBytes.family, TestBytes.qualifier, value);
+    transactionAwareHTable.put(put);
+
+    // test that a write from a tx started after the first is not visible
+    inprogressTxContext2.start();
+    Put put2 = new Put(TestBytes.row);
+    byte[] value2 = Bytes.toBytes("writing2");
+    put2.add(TestBytes.family, TestBytes.qualifier, value2);
+    txHTable2.put(put2);
+
+    Get get = new Get(TestBytes.row);
+    Result row = transactionAwareHTable.get(get);
+    assertFalse(row.isEmpty());
+    byte[] col1Value = row.getValue(TestBytes.family, TestBytes.qualifier);
+    Assert.assertNotNull(col1Value);
+    Assert.assertArrayEquals(value, col1Value);
+    // write from in-progress transaction should not be visible
+    byte[] col2Value = row.getValue(TestBytes.family, col2);
+    assertNull(col2Value);
+
+    // commit in-progress transaction, should still not be visible
+    inprogressTxContext1.finish();
+
+    get = new Get(TestBytes.row);
+    row = transactionAwareHTable.get(get);
+    assertFalse(row.isEmpty());
+    col2Value = row.getValue(TestBytes.family, col2);
+    assertNull(col2Value);
+
+    transactionContext.finish();
+
+    inprogressTxContext2.abort();
+  }
+
+  @Test
+  public void testRowLevelConflictDetection() throws Exception {
+    TransactionAwareHTable txTable1 = new TransactionAwareHTable(new HTable(conf, TestBytes.table),
+        TxConstants.ConflictDetection.ROW);
+    TransactionContext txContext1 = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable1);
+
+    TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table),
+        TxConstants.ConflictDetection.ROW);
+    TransactionContext txContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable2);
+
+    byte[] row1 = Bytes.toBytes("row1");
+    byte[] row2 = Bytes.toBytes("row2");
+    byte[] col1 = Bytes.toBytes("c1");
+    byte[] col2 = Bytes.toBytes("c2");
+    byte[] val1 = Bytes.toBytes("val1");
+    byte[] val2 = Bytes.toBytes("val2");
+
+    // test that concurrent writing to different rows succeeds
+    txContext1.start();
+    txTable1.put(new Put(row1).add(TestBytes.family, col1, val1));
+
+    txContext2.start();
+    txTable2.put(new Put(row2).add(TestBytes.family, col1, val2));
+
+    // should be no conflicts
+    txContext1.finish();
+    txContext2.finish();
+
+    transactionContext.start();
+    Result res = transactionAwareHTable.get(new Get(row1));
+    assertFalse(res.isEmpty());
+    Cell cell = res.getColumnLatestCell(TestBytes.family, col1);
+    assertNotNull(cell);
+    assertArrayEquals(val1, CellUtil.cloneValue(cell));
+
+    res = transactionAwareHTable.get(new Get(row2));
+    assertFalse(res.isEmpty());
+    cell = res.getColumnLatestCell(TestBytes.family, col1);
+    assertNotNull(cell);
+    assertArrayEquals(val2, CellUtil.cloneValue(cell));
+    transactionContext.finish();
+
+    // test that writing to different columns in the same row fails
+    txContext1.start();
+    txTable1.put(new Put(row1).add(TestBytes.family, col1, val2));
+
+    txContext2.start();
+    txTable2.put(new Put(row1).add(TestBytes.family, col2, val2));
+
+    txContext1.finish();
+    try {
+      txContext2.finish();
+      fail("txContext2 should have encountered a row-level conflict during commit");
+    } catch (TransactionConflictException tce) {
+      txContext2.abort();
+    }
+
+    transactionContext.start();
+    res = transactionAwareHTable.get(new Get(row1));
+    assertFalse(res.isEmpty());
+    cell = res.getColumnLatestCell(TestBytes.family, col1);
+    assertNotNull(cell);
+    // should now be val2
+    assertArrayEquals(val2, CellUtil.cloneValue(cell));
+
+    cell = res.getColumnLatestCell(TestBytes.family, col2);
+    // col2 should not be visible due to conflict
+    assertNull(cell);
+    transactionContext.finish();
+
+    // test that writing to the same column in the same row fails
+    txContext1.start();
+    txTable1.put(new Put(row2).add(TestBytes.family, col2, val1));
+
+    txContext2.start();
+    txTable2.put(new Put(row2).add(TestBytes.family, col2, val2));
+
+    txContext1.finish();
+    try {
+      txContext2.finish();
+      fail("txContext2 should have encountered a row and column level conflict during commit");
+    } catch (TransactionConflictException tce) {
+      txContext2.abort();
+    }
+
+    transactionContext.start();
+    res = transactionAwareHTable.get(new Get(row2));
+    assertFalse(res.isEmpty());
+    cell = res.getColumnLatestCell(TestBytes.family, col2);
+    assertNotNull(cell);
+    // should now be val1
+    assertArrayEquals(val1, CellUtil.cloneValue(cell));
+    transactionContext.finish();
+
+    // verify change set that is being reported only on rows
+    txContext1.start();
+    txTable1.put(new Put(row1).add(TestBytes.family, col1, val1));
+    txTable1.put(new Put(row2).add(TestBytes.family, col2, val2));
+
+    Collection<byte[]> changeSet = txTable1.getTxChanges();
+    assertNotNull(changeSet);
+    assertEquals(2, changeSet.size());
+    assertTrue(changeSet.contains(txTable1.getChangeKey(row1, null, null)));
+    assertTrue(changeSet.contains(txTable1.getChangeKey(row2, null, null)));
+    txContext1.finish();
+  }
+
+  @Test
+  public void testNoneLevelConflictDetection() throws Exception {
+    InMemoryTxSystemClient txClient = new InMemoryTxSystemClient(txManager);
+    TransactionAwareHTable txTable1 = new TransactionAwareHTable(new HTable(conf, TestBytes.table),
+        TxConstants.ConflictDetection.NONE);
+    TransactionContext txContext1 = new TransactionContext(txClient, txTable1);
+
+    TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table),
+        TxConstants.ConflictDetection.NONE);
+    TransactionContext txContext2 = new TransactionContext(txClient, txTable2);
+
+    // overlapping writes to the same row + column should not conflict
+
+    txContext1.start();
+    txTable1.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
+
+    // changes should not be visible yet
+    txContext2.start();
+    Result row = txTable2.get(new Get(TestBytes.row));
+    assertTrue(row.isEmpty());
+
+    txTable2.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value2));
+
+    // both commits should succeed
+    txContext1.finish();
+    txContext2.finish();
+
+    txContext1.start();
+    row = txTable1.get(new Get(TestBytes.row));
+    assertFalse(row.isEmpty());
+    assertArrayEquals(TestBytes.value2, row.getValue(TestBytes.family, TestBytes.qualifier));
+    txContext1.finish();
+
+    // transaction abort should still rollback changes
+
+    txContext1.start();
+    txTable1.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
+    txContext1.abort();
+
+    // changes to row2 should be rolled back
+    txContext2.start();
+    Result row2 = txTable2.get(new Get(TestBytes.row2));
+    assertTrue(row2.isEmpty());
+    txContext2.finish();
+
+    // transaction invalidate should still make changes invisible
+
+    txContext1.start();
+    Transaction tx1 = txContext1.getCurrentTransaction();
+    txTable1.put(new Put(TestBytes.row3).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
+    assertNotNull(tx1);
+    txClient.invalidate(tx1.getWritePointer());
+
+    // changes to row2 should be rolled back
+    txContext2.start();
+    Result row3 = txTable2.get(new Get(TestBytes.row3));
+    assertTrue(row3.isEmpty());
+    txContext2.finish();
+  }
+
+  @Test
+  public void testCheckpoint() throws Exception {
+    // start a transaction, using checkpoints between writes
+    transactionContext.start();
+    transactionAwareHTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
+    Transaction origTx = transactionContext.getCurrentTransaction();
+    transactionContext.checkpoint();
+    Transaction postCheckpointTx = transactionContext.getCurrentTransaction();
+
+    assertEquals(origTx.getTransactionId(), postCheckpointTx.getTransactionId());
+    assertNotEquals(origTx.getWritePointer(), postCheckpointTx.getWritePointer());
+    long[] checkpointPtrs = postCheckpointTx.getCheckpointWritePointers();
+    assertEquals(1, checkpointPtrs.length);
+    assertEquals(postCheckpointTx.getWritePointer(), checkpointPtrs[0]);
+
+    transactionAwareHTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value2));
+    transactionContext.checkpoint();
+    Transaction postCheckpointTx2 = transactionContext.getCurrentTransaction();
+
+    assertEquals(origTx.getTransactionId(), postCheckpointTx2.getTransactionId());
+    assertNotEquals(postCheckpointTx.getWritePointer(), postCheckpointTx2.getWritePointer());
+    long[] checkpointPtrs2 = postCheckpointTx2.getCheckpointWritePointers();
+    assertEquals(2, checkpointPtrs2.length);
+    assertEquals(postCheckpointTx.getWritePointer(), checkpointPtrs2[0]);
+    assertEquals(postCheckpointTx2.getWritePointer(), checkpointPtrs2[1]);
+
+    transactionAwareHTable.put(new Put(TestBytes.row3).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
+
+    // by default, all rows should be visible with Read-Your-Writes
+    verifyRow(transactionAwareHTable, TestBytes.row, TestBytes.value);
+    verifyRow(transactionAwareHTable, TestBytes.row2, TestBytes.value2);
+    verifyRow(transactionAwareHTable, TestBytes.row3, TestBytes.value);
+
+    // when disabling current write pointer, only the previous checkpoints should be visible
+    transactionContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
+    Get get = new Get(TestBytes.row);
+    verifyRow(transactionAwareHTable, get, TestBytes.value);
+    get = new Get(TestBytes.row2);
+    verifyRow(transactionAwareHTable, get, TestBytes.value2);
+    get = new Get(TestBytes.row3);
+    verifyRow(transactionAwareHTable, get, null);
+
+    // test scan results excluding current write pointer
+    Scan scan = new Scan();
+    ResultScanner scanner = transactionAwareHTable.getScanner(scan);
+
+    Result row = scanner.next();
+    assertNotNull(row);
+    assertArrayEquals(TestBytes.row, row.getRow());
+    assertEquals(1, row.size());
+    assertArrayEquals(TestBytes.value, row.getValue(TestBytes.family, TestBytes.qualifier));
+
+    row = scanner.next();
+    assertNotNull(row);
+    assertArrayEquals(TestBytes.row2, row.getRow());
+    assertEquals(1, row.size());
+    assertArrayEquals(TestBytes.value2, row.getValue(TestBytes.family, TestBytes.qualifier));
+
+    row = scanner.next();
+    assertNull(row);
+    scanner.close();
+    transactionContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT);
+
+    // commit transaction, verify writes are visible
+    transactionContext.finish();
+
+    transactionContext.start();
+    verifyRow(transactionAwareHTable, TestBytes.row, TestBytes.value);
+    verifyRow(transactionAwareHTable, TestBytes.row2, TestBytes.value2);
+    verifyRow(transactionAwareHTable, TestBytes.row3, TestBytes.value);
+    transactionContext.finish();
+  }
+
+  @Test
+  public void testInProgressCheckpoint() throws Exception {
+    // start a transaction, using checkpoints between writes
+    transactionContext.start();
+    transactionAwareHTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
+    transactionContext.checkpoint();
+    transactionAwareHTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value2));
+
+    // check that writes are still not visible to other clients
+    TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table));
+    TransactionContext txContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable2);
+
+    txContext2.start();
+    verifyRow(txTable2, TestBytes.row, null);
+    verifyRow(txTable2, TestBytes.row2, null);
+    txContext2.finish();
+    txTable2.close();
+
+    transactionContext.finish();
+
+    // verify writes are visible after commit
+    transactionContext.start();
+    verifyRow(transactionAwareHTable, TestBytes.row, TestBytes.value);
+    verifyRow(transactionAwareHTable, TestBytes.row2, TestBytes.value2);
+    transactionContext.finish();
+  }
+
+  @Test
+  public void testCheckpointRollback() throws Exception {
+    // start a transaction, using checkpoints between writes
+    transactionContext.start();
+    transactionAwareHTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
+    transactionContext.checkpoint();
+    transactionAwareHTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value2));
+    transactionContext.checkpoint();
+    transactionAwareHTable.put(new Put(TestBytes.row3).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
+
+    transactionContext.abort();
+
+    transactionContext.start();
+    verifyRow(transactionAwareHTable, TestBytes.row, null);
+    verifyRow(transactionAwareHTable, TestBytes.row2, null);
+    verifyRow(transactionAwareHTable, TestBytes.row3, null);
+
+    Scan scan = new Scan();
+    ResultScanner scanner = transactionAwareHTable.getScanner(scan);
+    assertNull(scanner.next());
+    scanner.close();
+    transactionContext.finish();
+  }
+
+  @Test
+  public void testCheckpointInvalidate() throws Exception {
+    // start a transaction, using checkpoints between writes
+    transactionContext.start();
+    Transaction origTx = transactionContext.getCurrentTransaction();
+    transactionAwareHTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
+    transactionContext.checkpoint();
+    Transaction checkpointTx1 = transactionContext.getCurrentTransaction();
+    transactionAwareHTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value2));
+    transactionContext.checkpoint();
+    Transaction checkpointTx2 = transactionContext.getCurrentTransaction();
+    transactionAwareHTable.put(new Put(TestBytes.row3).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
+
+    TransactionSystemClient txClient = new InMemoryTxSystemClient(txManager);
+    txClient.invalidate(transactionContext.getCurrentTransaction().getTransactionId());
+
+    // check that writes are not visible
+    TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table));
+    TransactionContext txContext2 = new TransactionContext(txClient, txTable2);
+    txContext2.start();
+    Transaction newTx = txContext2.getCurrentTransaction();
+
+    // all 3 writes pointers from the previous transaction should now be excluded
+    assertTrue(newTx.isExcluded(origTx.getWritePointer()));
+    assertTrue(newTx.isExcluded(checkpointTx1.getWritePointer()));
+    assertTrue(newTx.isExcluded(checkpointTx2.getWritePointer()));
+
+    verifyRow(txTable2, TestBytes.row, null);
+    verifyRow(txTable2, TestBytes.row2, null);
+    verifyRow(txTable2, TestBytes.row3, null);
+
+    Scan scan = new Scan();
+    ResultScanner scanner = txTable2.getScanner(scan);
+    assertNull(scanner.next());
+    scanner.close();
+    txContext2.finish();
+  }
+
+  @Test
+  public void testExistingData() throws Exception {
+    byte[] val11 = Bytes.toBytes("val11");
+    byte[] val12 = Bytes.toBytes("val12");
+    byte[] val21 = Bytes.toBytes("val21");
+    byte[] val22 = Bytes.toBytes("val22");
+    byte[] val31 = Bytes.toBytes("val31");
+    byte[] val111 = Bytes.toBytes("val111");
+
+    TransactionAwareHTable txTable =
+      new TransactionAwareHTable(createTable(Bytes.toBytes("testExistingData"), new byte[][]{TestBytes.family}, true, 
+      Collections.<String>emptyList()));
+    TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
+
+    // Add some pre-existing, non-transactional data
+    HTable nonTxTable = new HTable(testUtil.getConfiguration(), txTable.getTableName());
+    nonTxTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, val11));
+    nonTxTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier2, val12));
+    nonTxTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, val21));
+    nonTxTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier2, val22));
+    nonTxTable.put(new Put(TestBytes.row4).add(TestBytes.family, TxConstants.FAMILY_DELETE_QUALIFIER,
+                                               HConstants.EMPTY_BYTE_ARRAY));
+    nonTxTable.put(new Put(TestBytes.row4).add(TestBytes.family, TestBytes.qualifier, HConstants.EMPTY_BYTE_ARRAY));
+    nonTxTable.flushCommits();
+
+    // Add transactional data
+    txContext.start();
+    txTable.put(new Put(TestBytes.row3).add(TestBytes.family, TestBytes.qualifier, val31));
+    txContext.finish();
+
+    txContext.start();
+    // test get
+    verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier), val11);
+    verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2), val12);
+    verifyRow(txTable, new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier), val21);
+    verifyRow(txTable, new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier2), val22);
+    verifyRow(txTable, new Get(TestBytes.row3).addColumn(TestBytes.family, TestBytes.qualifier), val31);
+    verifyRow(txTable, new Get(TestBytes.row4).addColumn(TestBytes.family, TxConstants.FAMILY_DELETE_QUALIFIER),
+              HConstants.EMPTY_BYTE_ARRAY);
+    verifyRow(txTable, new Get(TestBytes.row4).addColumn(TestBytes.family, TestBytes.qualifier),
+              HConstants.EMPTY_BYTE_ARRAY);
+
+    // test scan
+    try (ResultScanner scanner = txTable.getScanner(new Scan())) {
+      Result result = scanner.next();
+      assertNotNull(result);
+      assertArrayEquals(TestBytes.row, result.getRow());
+      assertArrayEquals(val11, result.getValue(TestBytes.family, TestBytes.qualifier));
+      assertArrayEquals(val12, result.getValue(TestBytes.family, TestBytes.qualifier2));
+      result = scanner.next();
+      assertNotNull(result);
+      assertArrayEquals(TestBytes.row2, result.getRow());
+      assertArrayEquals(val21, result.getValue(TestBytes.family, TestBytes.qualifier));
+      assertArrayEquals(val22, result.getValue(TestBytes.family, TestBytes.qualifier2));
+      result = scanner.next();
+      assertNotNull(result);
+      assertArrayEquals(TestBytes.row3, result.getRow());
+      assertArrayEquals(val31, result.getValue(TestBytes.family, TestBytes.qualifier));
+      result = scanner.next();
+      assertNotNull(result);
+      assertArrayEquals(TestBytes.row4, result.getRow());
+      assertArrayEquals(HConstants.EMPTY_BYTE_ARRAY, result.getValue(TestBytes.family,
+                                                                     TxConstants.FAMILY_DELETE_QUALIFIER));
+      assertArrayEquals(HConstants.EMPTY_BYTE_ARRAY, result.getValue(TestBytes.family, TestBytes.qualifier));
+      assertNull(scanner.next());
+    }
+    txContext.finish();
+
+    // test update and delete
+    txContext.start();
+    txTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, val111));
+    txTable.delete(new Delete(TestBytes.row2).deleteColumns(TestBytes.family, TestBytes.qualifier));
+    txContext.finish();
+
+    txContext.start();
+    verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier), val111);
+    verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2), val12);
+    verifyRow(txTable, new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier), null);
+    verifyRow(txTable, new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier2), val22);
+    verifyRow(txTable, new Get(TestBytes.row3).addColumn(TestBytes.family, TestBytes.qualifier), val31);
+    verifyRow(txTable, new Get(TestBytes.row4).addColumn(TestBytes.family, TxConstants.FAMILY_DELETE_QUALIFIER),
+              HConstants.EMPTY_BYTE_ARRAY);
+    verifyRow(txTable, new Get(TestBytes.row4).addColumn(TestBytes.family, TestBytes.qualifier),
+              HConstants.EMPTY_BYTE_ARRAY);
+    txContext.finish();
+
+    // test scan
+    txContext.start();
+    try (ResultScanner scanner = txTable.getScanner(new Scan())) {
+      Result result = scanner.next();
+      assertNotNull(result);
+      assertArrayEquals(TestBytes.row, result.getRow());
+      assertArrayEquals(val111, result.getValue(TestBytes.family, TestBytes.qualifier));
+      assertArrayEquals(val12, result.getValue(TestBytes.family, TestBytes.qualifier2));
+      result = scanner.next();
+      assertNotNull(result);
+      assertArrayEquals(TestBytes.row2, result.getRow());
+      assertArrayEquals(null, result.getValue(TestBytes.family, TestBytes.qualifier));
+      assertArrayEquals(val22, result.getValue(TestBytes.family, TestBytes.qualifier2));
+      result = scanner.next();
+      assertNotNull(result);
+      assertArrayEquals(TestBytes.row3, result.getRow());
+      assertArrayEquals(val31, result.getValue(TestBytes.family, TestBytes.qualifier));
+      result = scanner.next();
+      assertNotNull(result);
+      assertArrayEquals(TestBytes.row4, result.getRow());
+      assertArrayEquals(HConstants.EMPTY_BYTE_ARRAY, result.getValue(TestBytes.family,
+                                                                     TxConstants.FAMILY_DELETE_QUALIFIER));
+      assertArrayEquals(HConstants.EMPTY_BYTE_ARRAY, result.getValue(TestBytes.family, TestBytes.qualifier));
+      assertNull(scanner.next());
+    }
+    txContext.finish();
+  }
+
+  private void verifyRow(HTableInterface table, byte[] rowkey, byte[] expectedValue) throws Exception {
+    verifyRow(table, new Get(rowkey), expectedValue);
+  }
+
+  private void verifyRow(HTableInterface table, Get get, byte[] expectedValue) throws Exception {
+    verifyRows(table, get, expectedValue == null ? null : ImmutableList.of(expectedValue));
+  }
+
+  private void verifyRows(HTableInterface table, Get get, List<byte[]> expectedValues) throws Exception {
+    Result result = table.get(get);
+    if (expectedValues == null) {
+      assertTrue(result.isEmpty());
+    } else {
+      assertFalse(result.isEmpty());
+      byte[] family = TestBytes.family;
+      byte[] col = TestBytes.qualifier;
+      if (get.hasFamilies()) {
+        family = get.getFamilyMap().keySet().iterator().next();
+        col = get.getFamilyMap().get(family).first();
+      }
+      Iterator<Cell> it = result.getColumnCells(family, col).iterator();
+      for (byte[] expectedValue : expectedValues) {
+        Assert.assertTrue(it.hasNext());
+        assertArrayEquals(expectedValue, CellUtil.cloneValue(it.next()));
+      }
+    }
+  }
+
+  private Cell[] getRow(HTableInterface table, Get get) throws Exception {
+    Result result = table.get(get);
+    return result.rawCells();
+  }
+
+  private void verifyScan(HTableInterface table, Scan scan, List<KeyValue> expectedCells) throws Exception {
+    List<Cell> actualCells = new ArrayList<>();
+    try (ResultScanner scanner = table.getScanner(scan)) {
+      Result[] results = scanner.next(expectedCells.size() + 1);
+      for (Result result : results) {
+        actualCells.addAll(Lists.newArrayList(result.rawCells()));
+      }
+      Assert.assertEquals(expectedCells, actualCells);
+    }
+  }
+
+  @Test
+  public void testVisibilityAll() throws Exception {
+    HTable nonTxTable = createTable(Bytes.toBytes("testVisibilityAll"),
+      new byte[][]{TestBytes.family, TestBytes.family2}, true, Collections.<String>emptyList());
+    TransactionAwareHTable txTable =
+      new TransactionAwareHTable(nonTxTable,
+                                 TxConstants.ConflictDetection.ROW); // ROW conflict detection to verify family deletes
+    TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
+
+    // start a transaction and create a delete marker
+    txContext.start();
+    //noinspection ConstantConditions
+    long txWp0 = txContext.getCurrentTransaction().getWritePointer();
+    txTable.delete(new Delete(TestBytes.row).deleteColumn(TestBytes.family, TestBytes.qualifier2));
+    txContext.finish();
+
+    // start a new transaction and write some values
+    txContext.start();
+    @SuppressWarnings("ConstantConditions")
+    long txWp1 = txContext.getCurrentTransaction().getWritePointer();
+    txTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
+    txTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier2, TestBytes.value2));
+    txTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
+    txTable.put(new Put(TestBytes.row).add(TestBytes.family2, TestBytes.qualifier, TestBytes.value));
+    txTable.put(new Put(TestBytes.row).add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value2));
+
+    // verify written data
+    verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier),
+              TestBytes.value);
+    verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2),
+              TestBytes.value2);
+    verifyRow(txTable, new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier),
+              TestBytes.value);
+    verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family2, TestBytes.qualifier),
+              TestBytes.value);
+    verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family2, TestBytes.qualifier2),
+              TestBytes.value2);
+
+    // checkpoint and make changes to written data now
+    txContext.checkpoint();
+    long txWp2 = txContext.getCurrentTransaction().getWritePointer();
+    // delete a column
+    txTable.delete(new Delete(TestBytes.row).deleteColumn(TestBytes.family, TestBytes.qualifier));
+    // no change to a column
+    txTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier2, TestBytes.value2));
+    // update a column
+    txTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value3));
+    // delete column family
+    txTable.delete(new Delete(TestBytes.row).deleteFamily(TestBytes.family2));
+
+    // verify changed values
+    verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier),
+              null);
+    verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2),
+              TestBytes.value2);
+    verifyRow(txTable, new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier),
+              TestBytes.value3);
+    verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family2, TestBytes.qualifier),
+              null);
+    verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family2, TestBytes.qualifier2),
+              null);
+
+    // run a scan with VisibilityLevel.ALL, this should return all raw changes by this transaction,
+    // and the raw change by prior transaction
+    //noinspection ConstantConditions
+    txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_ALL);
+    List<KeyValue> expected = ImmutableList.of(
+      new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier, txWp2, KeyValue.Type.DeleteColumn),
+      new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier, txWp1, TestBytes.value),
+      new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp2, TestBytes.value2),
+      new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp1, TestBytes.value2),
+      new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp0, KeyValue.Type.DeleteColumn),
+      new KeyValue(TestBytes.row, TestBytes.family2, null, txWp2, KeyValue.Type.DeleteFamily),
+      new KeyValue(TestBytes.row, TestBytes.family2, TestBytes.qualifier, txWp1, TestBytes.value),
+      new KeyValue(TestBytes.row, TestBytes.family2, TestBytes.qualifier2, txWp1, TestBytes.value2),
+      new KeyValue(TestBytes.row2, TestBytes.family, TestBytes.qualifier, txWp2, TestBytes.value3),
+      new KeyValue(TestBytes.row2, TestBytes.family, TestBytes.qualifier, txWp1, TestBytes.value)
+    );
+    verifyScan(txTable, new Scan(), expected);
+
+    // verify a Get is also able to return all snapshot versions
+    Get get = new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier);
+    Cell[] cells = getRow(txTable, get);
+    Assert.assertEquals(2, cells.length);
+    Assert.assertTrue(CellUtil.isDelete(cells[0]));
+    Assert.assertArrayEquals(TestBytes.value, CellUtil.cloneValue(cells[1]));
+
+    get = new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2);
+    cells = getRow(txTable, get);
+    Assert.assertEquals(3, cells.length);
+    Assert.assertArrayEquals(TestBytes.value2, CellUtil.cloneValue(cells[0]));
+    Assert.assertArrayEquals(TestBytes.value2, CellUtil.cloneValue(cells[1]));
+    Assert.assertTrue(CellUtil.isDeleteColumns(cells[2]));
+
+    verifyRows(txTable, new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier),
+               ImmutableList.of(TestBytes.value3, TestBytes.value));
+
+    get = new Get(TestBytes.row).addColumn(TestBytes.family2, TestBytes.qualifier);
+    cells = getRow(txTable, get);
+    Assert.assertEquals(2, cells.length);
+    Assert.assertTrue(CellUtil.isDelete(cells[0]));
+    Assert.assertArrayEquals(TestBytes.value, CellUtil.cloneValue(cells[1]));
+
+    get = new Get(TestBytes.row).addColumn(TestBytes.family2, TestBytes.qualifier2);
+    cells = getRow(txTable, get);
+    Assert.assertEquals(2, cells.length);
+    Assert.assertTrue(CellUtil.isDelete(cells[0]));
+    Assert.assertArrayEquals(TestBytes.value2, CellUtil.cloneValue(cells[1]));
+
+    // Verify VisibilityLevel.SNAPSHOT
+    txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT);
+    expected = ImmutableList.of(
+      new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp2, TestBytes.value2),
+      new KeyValue(TestBytes.row2, TestBytes.family, TestBytes.qualifier, txWp2, TestBytes.value3)
+    );
+    verifyScan(txTable, new Scan(), expected);
+
+    // Verify VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT
+    txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
+    expected = ImmutableList.of(
+      new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier, txWp1, TestBytes.value),
+      new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp1, TestBytes.value2),
+      new KeyValue(TestBytes.row, TestBytes.family2, TestBytes.qualifier, txWp1, TestBytes.value),
+      new KeyValue(TestBytes.row, TestBytes.family2, TestBytes.qualifier2, txWp1, TestBytes.value2),
+      new KeyValue(TestBytes.row2, TestBytes.family, TestBytes.qualifier, txWp1, TestBytes.value)
+    );
+    verifyScan(txTable, new Scan(), expected);
+    txContext.finish();
+
+    // finally verify values once more after commit, this time we should get only committed raw values for
+    // all visibility levels
+    txContext.start();
+    txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_ALL);
+    expected = ImmutableList.of(
+      new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier, txWp2, KeyValue.Type.DeleteColumn),
+      new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp2, TestBytes.value2),
+      new KeyValue(TestBytes.row, TestBytes.family2, null, txWp2, KeyValue.Type.DeleteFamily),
+      new KeyValue(TestBytes.row2, TestBytes.family, TestBytes.qualifier, txWp2, TestBytes.value3)
+    );
+    verifyScan(txTable, new Scan(), expected);
+
+    txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT);
+    expected = ImmutableList.of(
+      new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp2, TestBytes.value2),
+      new KeyValue(TestBytes.row2, TestBytes.family, TestBytes.qualifier, txWp2, TestBytes.value3)
+    );
+    verifyScan(txTable, new Scan(), expected);
+
+    txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
+    expected = ImmutableList.of(
+      new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp2, TestBytes.value2),
+      new KeyValue(TestBytes.row2, TestBytes.family, TestBytes.qualifier, txWp2, TestBytes.value3)
+    );
+    verifyScan(txTable, new Scan(), expected);
+
+    verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier),
+              null);
+    verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2),
+              TestBytes.value2);
+    verifyRow(txTable, new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier),
+              TestBytes.value3);
+    txContext.finish();
+
+    // Test with regular HBase deletes in pre-existing data
+    long now = System.currentTimeMillis();
+    Delete deleteColumn = new Delete(TestBytes.row3).deleteColumn(TestBytes.family, TestBytes.qualifier, now - 1);
+    // to prevent Tephra from replacing delete with delete marker
+    deleteColumn.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
+    nonTxTable.delete(deleteColumn);
+    Delete deleteFamily = new Delete(TestBytes.row3).deleteFamily(TestBytes.family2, now);
+    // to prevent Tephra from replacing delete with delete marker
+    deleteFamily.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
+    nonTxTable.delete(deleteFamily);
+    nonTxTable.flushCommits();
+
+    txContext.start();
+    txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_ALL);
+    expected = ImmutableList.of(
+      new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier, txWp2, KeyValue.Type.DeleteColumn),
+      new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp2, TestBytes.value2),
+      new KeyValue(TestBytes.row, TestBytes.family2, null, txWp2, KeyValue.Type.DeleteFamily),
+      new KeyValue(TestBytes.row2, TestBytes.family, TestBytes.qualifier, txWp2, TestBytes.value3),
+      new KeyValue(TestBytes.row3, TestBytes.family, TestBytes.qualifier, now - 1, KeyValue.Type.Delete),
+      new KeyValue(TestBytes.row3, TestBytes.family2, null, now, KeyValue.Type.DeleteFamily)
+    );
+    // test scan
+    Scan scan = new Scan();
+    scan.setRaw(true);
+    verifyScan(txTable, scan, expected);
+    txContext.finish();
+  }
+
+  @Test
+  public void testFilters() throws Exception {
+    // Add some values to table
+    transactionContext.start();
+    Put put = new Put(TestBytes.row);
+    byte[] val1 = Bytes.toBytes(1L);
+    put.add(TestBytes.family, TestBytes.qualifier, val1);
+    transactionAwareHTable.put(put);
+    put = new Put(TestBytes.row2);
+    byte[] val2 = Bytes.toBytes(2L);
+    put.add(TestBytes.family, TestBytes.qualifier, val2);
+    transactionAwareHTable.put(put);
+    put = new Put(TestBytes.row3);
+    byte[] val3 = Bytes.toBytes(3L);
+    put.add(TestBytes.family, TestBytes.qualifier, val3);
+    transactionAwareHTable.put(put);
+    put = new Put(TestBytes.row4);
+    byte[] val4 = Bytes.toBytes(4L);
+    put.add(TestBytes.family, TestBytes.qualifier, val4);
+    transactionAwareHTable.put(put);
+    transactionContext.finish();
+
+    // Delete cell with value 2
+    transactionContext.start();
+    Delete delete = new Delete(TestBytes.row2);
+    delete.addColumn(TestBytes.family, TestBytes.qualifier);
+    transactionAwareHTable.delete(delete);
+    transactionContext.finish();
+
+    // Scan for values less than 4, should get only values 1 and 3
+    transactionContext.start();
+    Scan scan = new Scan(TestBytes.row, new ValueFilter(CompareFilter.CompareOp.LESS, new LongComparator(4)));
+    try (ResultScanner scanner = transactionAwareHTable.getScanner(scan)) {
+      Result result = scanner.next();
+      assertNotNull(result);
+      assertArrayEquals(TestBytes.row, result.getRow());
+      assertArrayEquals(val1, result.getValue(TestBytes.family, TestBytes.qualifier));
+      result = scanner.next();
+      assertNotNull(result);
+      assertArrayEquals(TestBytes.row3, result.getRow());
+      assertArrayEquals(val3, result.getValue(TestBytes.family, TestBytes.qualifier));
+      result = scanner.next();
+      assertNull(result);
+    }
+    transactionContext.finish();
+
+    // Run a Get with a filter for less than 10 on row4, should get value 4
+    transactionContext.start();
+    Get get = new Get(TestBytes.row4);
+    get.setFilter(new ValueFilter(CompareFilter.CompareOp.LESS, new LongComparator(10)));
+    Result result = transactionAwareHTable.get(get);
+    assertFalse(result.isEmpty());
+    assertArrayEquals(val4, result.getValue(TestBytes.family, TestBytes.qualifier));
+    transactionContext.finish();
+
+    // Change value of row4 to 40
+    transactionContext.start();
+    put = new Put(TestBytes.row4);
+    byte[] val40 = Bytes.toBytes(40L);
+    put.add(TestBytes.family, TestBytes.qualifier, val40);
+    transactionAwareHTable.put(put);
+    transactionContext.finish();
+
+    // Scan for values less than 10, should get only values 1 and 3
+    transactionContext.start();
+    scan = new Scan(TestBytes.row, new ValueFilter(CompareFilter.CompareOp.LESS, new LongComparator(10)));
+    try (ResultScanner scanner = transactionAwareHTable.getScanner(scan)) {
+      result = scanner.next();
+      assertNotNull(result);
+      assertArrayEquals(TestBytes.row, result.getRow());
+      assertArrayEquals(val1, result.getValue(TestBytes.family, TestBytes.qualifier));
+      result = scanner.next();
+      assertNotNull(result);
+      assertArrayEquals(TestBytes.row3, result.getRow());
+      assertArrayEquals(val3, result.getValue(TestBytes.family, TestBytes.qualifier));
+      result = scanner.next();
+      assertNull(result);
+    }
+    transactionContext.finish();
+
+    // Run the Get again with a filter for less than 10 on row4, this time should not get any results
+    transactionContext.start();
+    result = transactionAwareHTable.get(get);
+    assertTrue(result.isEmpty());
+    transactionContext.finish();
+  }
+
+  /**
+   * Tests that transaction co-processor works with older clients
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testOlderClientOperations() throws Exception {
+    // Use old HTable to test
+    TransactionAwareHTable oldTxAware = new OldTransactionAwareHTable(hTable);
+    transactionContext.addTransactionAware(oldTxAware);
+
+    transactionContext.start();
+    Put put = new Put(TestBytes.row);
+    put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
+    oldTxAware.put(put);
+    transactionContext.finish();
+
+    transactionContext.start();
+    long txId = transactionContext.getCurrentTransaction().getTransactionId();
+    put = new Put(TestBytes.row);
+    put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value2);
+    oldTxAware.put(put);
+    // Invalidate the second Put
+    TransactionSystemClient txClient = new InMemoryTxSystemClient(txManager);
+    txClient.invalidate(txId);
+
+    transactionContext.start();
+    put = new Put(TestBytes.row);
+    put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value3);
+    oldTxAware.put(put);
+    // Abort the third Put
+    transactionContext.abort();
+
+    // Get should now return the first value
+    transactionContext.start();
+    Result result = oldTxAware.get(new Get(TestBytes.row));
+    transactionContext.finish();
+
+    byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
+    assertArrayEquals(TestBytes.value, value);
+  }
+
+  /**
+   * Represents older transaction clients
+   */
+  private static class OldTransactionAwareHTable extends TransactionAwareHTable {
+    public OldTransactionAwareHTable(HTableInterface hTable) {
+      super(hTable);
+    }
+
+    @Override
+    public void addToOperation(OperationWithAttributes op, Transaction tx) throws IOException {
+      op.setAttribute(TxConstants.OLD_TX_OPERATION_ATTRIBUTE_KEY, txCodec.encode(tx));
+    }
+
+    @Override
+    protected void makeRollbackOperation(Delete delete) {
+      delete.setAttribute(TxConstants.OLD_TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2246abff/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/CellSkipFilterTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/CellSkipFilterTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/CellSkipFilterTest.java
new file mode 100644
index 0000000..428d3b0
--- /dev/null
+++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/CellSkipFilterTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.coprocessor;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * HBase 1.1 specific test for testing {@link CellSkipFilter}.
+ */
+public class CellSkipFilterTest {
+
+  private static final String ROW1KEY = "row1";
+  private static final String ROW2KEY = "row2";
+  private static final String FAM1KEY = "fam1";
+  private static final String COL1KEY = "col1";
+  private static final String FAM2KEY = "fam2";
+  private static final String COL2KEY = "col2";
+  private static final String VALUE = "value";
+
+  @Test
+  public void testSkipFiltering() throws Exception {
+    long timestamp = System.currentTimeMillis();
+    // Test to check that we get NEXT_COL once the INCLUDE_AND_NEXT_COL is returned for the same key
+    Filter filter = new CellSkipFilter(new MyFilter(0));
+    assertEquals(Filter.ReturnCode.INCLUDE, filter.filterKeyValue(newKeyValue(ROW1KEY, FAM1KEY, COL1KEY, VALUE,
+                                                                              timestamp)));
+    assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, filter.filterKeyValue(newKeyValue(ROW1KEY, FAM1KEY, COL1KEY,
+                                                                                           VALUE, timestamp - 1)));
+
+    // Next call should get NEXT_COL instead of SKIP, as it would be returned by CellSkipFilter
+    assertEquals(Filter.ReturnCode.NEXT_COL, filter.filterKeyValue(newKeyValue(ROW1KEY, FAM1KEY, COL1KEY, VALUE,
+                                                                               timestamp - 2)));
+
+    // Next call with the same key should return the NEXT_COL again, as it would be returned by CellSkipFilter
+    assertEquals(Filter.ReturnCode.NEXT_COL, filter.filterKeyValue(newKeyValue(ROW1KEY, FAM1KEY, COL1KEY, VALUE,
+                                                                               timestamp - 3)));
+
+    // Since MyFilter counter is not incremented in the previous call, filtering for the different keyvalue should
+    // give SKIP from MyFilter
+    assertEquals(Filter.ReturnCode.SKIP, filter.filterKeyValue(newKeyValue(ROW1KEY, FAM2KEY, COL1KEY, VALUE,
+                                                                           timestamp - 4)));
+
+    // Test to check that we get NEXT_COL once the NEXT_COL is returned for the same key
+    filter = new CellSkipFilter(new MyFilter(2));
+    assertEquals(Filter.ReturnCode.SKIP, filter.filterKeyValue(newKeyValue(ROW1KEY, FAM1KEY, COL1KEY, VALUE,
+                                                                           timestamp)));
+    assertEquals(Filter.ReturnCode.NEXT_COL, filter.filterKeyValue(newKeyValue(ROW1KEY, FAM1KEY, COL1KEY, VALUE,
+                                                                               timestamp - 1)));
+
+    // Next call should get NEXT_COL instead of NEXT_ROW, as it would be returned by CellSkipFilter
+    assertEquals(Filter.ReturnCode.NEXT_COL, filter.filterKeyValue(newKeyValue(ROW1KEY, FAM1KEY, COL1KEY, VALUE,
+                                                                               timestamp - 2)));
+
+    // Next call with the same key should return the NEXT_COL again, as it would be returned by CellSkipFilter
+    assertEquals(Filter.ReturnCode.NEXT_COL, filter.filterKeyValue(newKeyValue(ROW1KEY, FAM1KEY, COL1KEY, VALUE,
+                                                                               timestamp - 3)));
+
+    // Since MyFilter counter is not incremented in the previous call, filtering for the different keyvalue should
+    // give NEXT_ROW from MyFilter
+    assertEquals(Filter.ReturnCode.NEXT_ROW, filter.filterKeyValue(newKeyValue(ROW1KEY, FAM1KEY, COL2KEY, VALUE,
+                                                                               timestamp - 4)));
+
+    // Next call with the new key should returned the SEEK_NEXT_USING_HINT
+    assertEquals(Filter.ReturnCode.SEEK_NEXT_USING_HINT, filter.filterKeyValue(newKeyValue(ROW2KEY, FAM1KEY, COL1KEY,
+                                                                                           VALUE, timestamp - 5)));
+  }
+
+  private KeyValue newKeyValue(String rowkey, String family, String column, String value, long timestamp) {
+    return new KeyValue(Bytes.toBytes(rowkey), Bytes.toBytes(family), Bytes.toBytes(column),
+                        timestamp, Bytes.toBytes(value));
+  }
+
+  /**
+   * Sample filter for testing. This filter maintains the {@link List} of {@link ReturnCode}s. It accepts the
+   * start index in the list and start serving the return codes corresponding that that index. Every time the
+   * return code is served, index is incremented.
+   */
+  class MyFilter extends FilterBase {
+
+    private final List<ReturnCode> returnCodes;
+    private int counter;
+
+    public MyFilter(int startIndex) {
+      returnCodes = Arrays.asList(ReturnCode.INCLUDE, ReturnCode.INCLUDE_AND_NEXT_COL, ReturnCode.SKIP,
+                                  ReturnCode.NEXT_COL, ReturnCode.NEXT_ROW, ReturnCode.SEEK_NEXT_USING_HINT);
+      counter = startIndex;
+    }
+
+    @Override
+    public ReturnCode filterKeyValue(Cell cell) throws IOException {
+      ReturnCode code = returnCodes.get(counter % returnCodes.size());
+      counter++;
+      return code;
+    }
+  }
+}