You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/05/11 19:59:03 UTC

[25/30] incubator-tephra git commit: TEPHRA-168 Remove HBase version from co-processor package names

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/d00d9f6d/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase96/coprocessor/TransactionProcessorTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase96/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase96/coprocessor/TransactionProcessorTest.java
deleted file mode 100644
index 5d49ac1..0000000
--- a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase96/coprocessor/TransactionProcessorTest.java
+++ /dev/null
@@ -1,771 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tephra.hbase96.coprocessor;
-
-import com.google.common.collect.ImmutableSortedMap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import it.unimi.dsi.fastutil.longs.LongArrayList;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.catalog.CatalogTracker;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.executor.ExecutorService;
-import org.apache.hadoop.hbase.fs.HFileSystem;
-import org.apache.hadoop.hbase.ipc.RpcServerInterface;
-import org.apache.hadoop.hbase.master.TableLockManager;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
-import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
-import org.apache.hadoop.hbase.regionserver.FlushRequester;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
-import org.apache.hadoop.hbase.regionserver.Leases;
-import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
-import org.apache.hadoop.hbase.regionserver.RegionServerServices;
-import org.apache.hadoop.hbase.regionserver.ScanType;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.tephra.ChangeId;
-import org.apache.tephra.Transaction;
-import org.apache.tephra.TransactionManager;
-import org.apache.tephra.TransactionType;
-import org.apache.tephra.TxConstants;
-import org.apache.tephra.coprocessor.TransactionStateCache;
-import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
-import org.apache.tephra.metrics.TxMetricsCollector;
-import org.apache.tephra.persist.HDFSTransactionStateStorage;
-import org.apache.tephra.persist.TransactionSnapshot;
-import org.apache.tephra.persist.TransactionVisibilityState;
-import org.apache.tephra.snapshot.DefaultSnapshotCodec;
-import org.apache.tephra.snapshot.SnapshotCodecProvider;
-import org.apache.tephra.util.TxUtils;
-import org.apache.zookeeper.KeeperException;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests filtering of invalid transaction data by the {@link TransactionProcessor} coprocessor.
- */
-public class TransactionProcessorTest {
-  private static final Logger LOG = LoggerFactory.getLogger(TransactionProcessorTest.class);
-
-  // 8 versions, 1 hour apart, latest is current ts.
-  private static final long[] V;
-
-  static {
-    long now = System.currentTimeMillis();
-    V = new long[9];
-    for (int i = 0; i < V.length; i++) {
-      V[i] = (now - TimeUnit.HOURS.toMillis(8 - i)) * TxConstants.MAX_TX_PER_MS;
-    }
-  }
-
-  @ClassRule
-  public static TemporaryFolder tmpFolder = new TemporaryFolder();
-  private static MiniDFSCluster dfsCluster;
-  private static Configuration conf;
-  private static LongArrayList invalidSet = new LongArrayList(new long[]{V[3], V[5], V[7]});
-  private static TransactionVisibilityState txVisibilityState;
-
-  @BeforeClass
-  public static void setupBeforeClass() throws Exception {
-    Configuration hConf = new Configuration();
-    String rootDir = tmpFolder.newFolder().getAbsolutePath();
-    hConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, rootDir);
-    hConf.set(HConstants.HBASE_DIR, rootDir + "/hbase");
-
-    dfsCluster = new MiniDFSCluster.Builder(hConf).numDataNodes(1).build();
-    dfsCluster.waitActive();
-    conf = HBaseConfiguration.create(dfsCluster.getFileSystem().getConf());
-
-    conf.unset(TxConstants.Manager.CFG_TX_HDFS_USER);
-    conf.unset(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES);
-    String localTestDir = tmpFolder.newFolder().getAbsolutePath();
-    conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, localTestDir);
-    conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName());
-
-    // write an initial transaction snapshot
-    TransactionSnapshot txSnapshot =
-      TransactionSnapshot.copyFrom(
-        System.currentTimeMillis(), V[6] - 1, V[7], invalidSet,
-        // this will set visibility upper bound to V[6]
-        Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx(V[6] - 1, Long.MAX_VALUE,
-                                                                                        TransactionType.SHORT))),
-        new HashMap<Long, Set<ChangeId>>(), new TreeMap<Long, Set<ChangeId>>());
-    txVisibilityState = new TransactionSnapshot(txSnapshot.getTimestamp(), txSnapshot.getReadPointer(),
-                                                txSnapshot.getWritePointer(), txSnapshot.getInvalid(),
-                                                txSnapshot.getInProgress());
-    HDFSTransactionStateStorage tmpStorage =
-      new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf), new TxMetricsCollector());
-    tmpStorage.startAndWait();
-    tmpStorage.writeSnapshot(txSnapshot);
-    tmpStorage.stopAndWait();
-  }
-
-  @AfterClass
-  public static void shutdownAfterClass() throws Exception {
-    dfsCluster.shutdown();
-  }
-
-  @Test
-  public void testDataJanitorRegionScanner() throws Exception {
-    String tableName = "TestRegionScanner";
-    byte[] familyBytes = Bytes.toBytes("f");
-    byte[] columnBytes = Bytes.toBytes("c");
-    HRegion region = createRegion(tableName, familyBytes, TimeUnit.HOURS.toMillis(3));
-    try {
-      region.initialize();
-      TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
-      LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
-
-      for (int i = 1; i <= 8; i++) {
-        for (int k = 1; k <= i; k++) {
-          Put p = new Put(Bytes.toBytes(i));
-          p.add(familyBytes, columnBytes, V[k], Bytes.toBytes(V[k]));
-          region.put(p);
-        }
-      }
-
-      List<Cell> results = Lists.newArrayList();
-
-      // force a flush to clear the data
-      // during flush, the coprocessor should drop all KeyValues with timestamps in the invalid set
-      LOG.info("Flushing region " + region.getRegionNameAsString());
-      region.flushcache();
-
-      // now a normal scan should only return the valid rows - testing that cleanup works on flush
-      Scan scan = new Scan();
-      scan.setMaxVersions(10);
-      RegionScanner regionScanner = region.getScanner(scan);
-
-      // first returned value should be "4" with version "4"
-      results.clear();
-      assertTrue(regionScanner.next(results));
-      assertKeyValueMatches(results, 4, new long[] {V[4]});
-
-      results.clear();
-      assertTrue(regionScanner.next(results));
-      assertKeyValueMatches(results, 5, new long[] {V[4]});
-
-      results.clear();
-      assertTrue(regionScanner.next(results));
-      assertKeyValueMatches(results, 6, new long[] {V[6], V[4]});
-
-      results.clear();
-      assertTrue(regionScanner.next(results));
-      assertKeyValueMatches(results, 7, new long[] {V[6], V[4]});
-
-      results.clear();
-      assertFalse(regionScanner.next(results));
-      assertKeyValueMatches(results, 8, new long[] {V[8], V[6], V[4]});
-    } finally {
-      region.close();
-    }
-  }
-
-  @Test
-  public void testDeleteFiltering() throws Exception {
-    String tableName = "TestDeleteFiltering";
-    byte[] familyBytes = Bytes.toBytes("f");
-    byte[] columnBytes = Bytes.toBytes("c");
-    HRegion region = createRegion(tableName, familyBytes, 0);
-    try {
-      region.initialize();
-      TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
-      LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
-
-      byte[] row = Bytes.toBytes(1);
-      for (int i = 4; i < V.length; i++) {
-        Put p = new Put(row);
-        p.add(familyBytes, columnBytes, V[i], Bytes.toBytes(V[i]));
-        region.put(p);
-      }
-
-      // delete from the third entry back
-      // take that cell's timestamp + 1 to simulate a delete in a new tx
-      long deleteTs = V[5] + 1;
-      Delete d = new Delete(row, deleteTs);
-      LOG.info("Issuing delete at timestamp " + deleteTs);
-      // row deletes are not yet supported (TransactionAwareHTable normally handles this)
-      d.deleteColumns(familyBytes, columnBytes, deleteTs);
-      region.delete(d);
-
-      List<Cell> results = Lists.newArrayList();
-
-      // force a flush to clear the data
-      // during flush, we should drop the deleted version, but not the others
-      LOG.info("Flushing region " + region.getRegionNameAsString());
-      region.flushcache();
-
-      // now a normal scan should return row with versions at: V[8], V[6].
-      // V[7] is invalid and V[5] and prior are deleted.
-      Scan scan = new Scan();
-      scan.setMaxVersions(10);
-      RegionScanner regionScanner = region.getScanner(scan);
-      // should be only one row
-      assertFalse(regionScanner.next(results));
-      assertKeyValueMatches(results, 1,
-                            new long[]{V[8], V[6], deleteTs},
-                            new byte[][]{Bytes.toBytes(V[8]), Bytes.toBytes(V[6]), new byte[0]});
-    } finally {
-      region.close();
-    }
-  }
-
-  @Test
-  public void testDeleteMarkerCleanup() throws Exception {
-    String tableName = "TestDeleteMarkerCleanup";
-    byte[] familyBytes = Bytes.toBytes("f");
-    HRegion region = createRegion(tableName, familyBytes, 0);
-    try {
-      region.initialize();
-
-      // all puts use a timestamp before the tx snapshot's visibility upper bound, making them eligible for removal
-      long writeTs = txVisibilityState.getVisibilityUpperBound() - 10;
-      // deletes are performed after the writes, but still before the visibility upper bound
-      long deleteTs = writeTs + 1;
-      // write separate columns to confirm that delete markers survive across flushes
-      byte[] row = Bytes.toBytes(100);
-      Put p = new Put(row);
-
-      LOG.info("Writing columns at timestamp " + writeTs);
-      for (int i = 0; i < 5; i++) {
-        byte[] iBytes = Bytes.toBytes(i);
-        p.add(familyBytes, iBytes, writeTs, iBytes);
-      }
-      region.put(p);
-      // read all back
-      Scan scan = new Scan(row);
-      RegionScanner regionScanner = region.getScanner(scan);
-      List<Cell> results = Lists.newArrayList();
-      assertFalse(regionScanner.next(results));
-      for (int i = 0; i < 5; i++) {
-        Cell cell = results.get(i);
-        assertArrayEquals(row, cell.getRow());
-        byte[] idxBytes = Bytes.toBytes(i);
-        assertArrayEquals(idxBytes, cell.getQualifier());
-        assertArrayEquals(idxBytes, cell.getValue());
-      }
-
-      // force a flush to clear the memstore
-      LOG.info("Before delete, flushing region " + region.getRegionNameAsString());
-      region.flushcache();
-
-      // delete the odd entries
-      for (int i = 0; i < 5; i++) {
-        if (i % 2 == 1) {
-          // deletes are performed as puts with empty values
-          Put deletePut = new Put(row);
-          deletePut.add(familyBytes, Bytes.toBytes(i), deleteTs, new byte[0]);
-          region.put(deletePut);
-        }
-      }
-
-      // read all back
-      scan = new Scan(row);
-      scan.setFilter(TransactionFilters.getVisibilityFilter(TxUtils.createDummyTransaction(txVisibilityState),
-                                                            new TreeMap<byte[], Long>(), false, ScanType.USER_SCAN));
-      regionScanner = region.getScanner(scan);
-      results = Lists.newArrayList();
-      assertFalse(regionScanner.next(results));
-      assertEquals(3, results.size());
-      // only even columns should exist
-      for (int i = 0; i < 3; i++) {
-        Cell cell = results.get(i);
-        LOG.info("Got cell " + cell);
-        assertArrayEquals(row, cell.getRow());
-        byte[] idxBytes = Bytes.toBytes(i * 2);
-        assertArrayEquals(idxBytes, cell.getQualifier());
-        assertArrayEquals(idxBytes, cell.getValue());
-      }
-
-      // force another flush on the delete markers
-      // during flush, we should retain the delete markers, since they can only safely be dropped by a major compaction
-      LOG.info("After delete, flushing region " + region.getRegionNameAsString());
-      region.flushcache();
-
-      scan = new Scan(row);
-      scan.setFilter(TransactionFilters.getVisibilityFilter(TxUtils.createDummyTransaction(txVisibilityState),
-                                                            new TreeMap<byte[], Long>(), false, ScanType.USER_SCAN));
-
-      regionScanner = region.getScanner(scan);
-      results = Lists.newArrayList();
-      assertFalse(regionScanner.next(results));
-      assertEquals(3, results.size());
-      // only even columns should exist
-      for (int i = 0; i < 3; i++) {
-        Cell cell = results.get(i);
-        assertArrayEquals(row, cell.getRow());
-        byte[] idxBytes = Bytes.toBytes(i * 2);
-        assertArrayEquals(idxBytes, cell.getQualifier());
-        assertArrayEquals(idxBytes, cell.getValue());
-      }
-
-      // force a major compaction
-      LOG.info("Forcing major compaction of region " + region.getRegionNameAsString());
-      region.compactStores(true);
-
-      // perform a raw scan (no filter) to confirm that the delete markers are now gone
-      scan = new Scan(row);
-      regionScanner = region.getScanner(scan);
-      results = Lists.newArrayList();
-      assertFalse(regionScanner.next(results));
-      assertEquals(3, results.size());
-      // only even columns should exist
-      for (int i = 0; i < 3; i++) {
-        Cell cell = results.get(i);
-        assertArrayEquals(row, cell.getRow());
-        byte[] idxBytes = Bytes.toBytes(i * 2);
-        assertArrayEquals(idxBytes, cell.getQualifier());
-        assertArrayEquals(idxBytes, cell.getValue());
-      }
-    } finally {
-      region.close();
-    }
-  }
-
-  /**
-   * Test that we correctly preserve the timestamp set for column family delete markers.  This is not
-   * directly required for the TransactionAwareHTable usage, but is the right thing to do and ensures
-   * that we make it easy to interoperate with other systems.
-   */
-  @Test
-  public void testFamilyDeleteTimestamp() throws Exception {
-    String tableName = "TestFamilyDeleteTimestamp";
-    byte[] family1Bytes = Bytes.toBytes("f1");
-    byte[] columnBytes = Bytes.toBytes("c");
-    byte[] rowBytes = Bytes.toBytes("row");
-    byte[] valBytes = Bytes.toBytes("val");
-    HRegion region = createRegion(tableName, family1Bytes, 0);
-    try {
-      region.initialize();
-
-      long now = System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS;
-      Put p = new Put(rowBytes);
-      p.add(family1Bytes, columnBytes, now - 10, valBytes);
-      region.put(p);
-
-      // issue a family delete with an explicit timestamp
-      Delete delete = new Delete(rowBytes, now);
-      delete.deleteFamily(family1Bytes, now - 5);
-      region.delete(delete);
-
-      // test that the delete marker preserved the timestamp
-      Scan scan = new Scan();
-      scan.setMaxVersions();
-      RegionScanner scanner = region.getScanner(scan);
-      List<Cell> results = Lists.newArrayList();
-      scanner.next(results);
-      assertEquals(2, results.size());
-      // delete marker should appear first
-      Cell cell = results.get(0);
-      assertArrayEquals(new byte[0], cell.getQualifier());
-      assertArrayEquals(new byte[0], cell.getValue());
-      assertEquals(now - 5, cell.getTimestamp());
-      // since this is an unfiltered scan against the region, the original put should be next
-      cell = results.get(1);
-      assertArrayEquals(valBytes, cell.getValue());
-      assertEquals(now - 10, cell.getTimestamp());
-      scanner.close();
-
-
-      // with a filtered scan the original put should disappear
-      scan = new Scan();
-      scan.setMaxVersions();
-      scan.setFilter(TransactionFilters.getVisibilityFilter(TxUtils.createDummyTransaction(txVisibilityState),
-                                                            new TreeMap<byte[], Long>(), false, ScanType.USER_SCAN));
-
-      scanner = region.getScanner(scan);
-      results = Lists.newArrayList();
-      scanner.next(results);
-      assertEquals(0, results.size());
-      scanner.close();
-    } finally {
-      region.close();
-    }
-  }
-
-  @Test
-  public void testPreExistingData() throws Exception {
-    String tableName = "TestPreExistingData";
-    byte[] familyBytes = Bytes.toBytes("f");
-    long ttlMillis = TimeUnit.DAYS.toMillis(14);
-    HRegion region = createRegion(tableName, familyBytes, ttlMillis);
-    try {
-      region.initialize();
-
-      // timestamps for pre-existing, non-transactional data
-      long now = txVisibilityState.getVisibilityUpperBound() / TxConstants.MAX_TX_PER_MS;
-      long older = now - ttlMillis / 2;
-      long newer = now - ttlMillis / 3;
-      // timestamps for transactional data
-      long nowTx = txVisibilityState.getVisibilityUpperBound();
-      long olderTx = nowTx - (ttlMillis / 2) * TxConstants.MAX_TX_PER_MS;
-      long newerTx = nowTx - (ttlMillis / 3) * TxConstants.MAX_TX_PER_MS;
-
-      Map<byte[], Long> ttls = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
-      ttls.put(familyBytes, ttlMillis);
-
-      List<Cell> cells = new ArrayList<>();
-      cells.add(new KeyValue(Bytes.toBytes("r1"), familyBytes, Bytes.toBytes("c1"), older, Bytes.toBytes("v11")));
-      cells.add(new KeyValue(Bytes.toBytes("r1"), familyBytes, Bytes.toBytes("c2"), newer, Bytes.toBytes("v12")));
-      cells.add(new KeyValue(Bytes.toBytes("r2"), familyBytes, Bytes.toBytes("c1"), older, Bytes.toBytes("v21")));
-      cells.add(new KeyValue(Bytes.toBytes("r2"), familyBytes, Bytes.toBytes("c2"), newer, Bytes.toBytes("v22")));
-      cells.add(new KeyValue(Bytes.toBytes("r3"), familyBytes, Bytes.toBytes("c1"), olderTx, Bytes.toBytes("v31")));
-      cells.add(new KeyValue(Bytes.toBytes("r3"), familyBytes, Bytes.toBytes("c2"), newerTx, Bytes.toBytes("v32")));
-
-      // Write non-transactional and transactional data
-      for (Cell c : cells) {
-        region.put(new Put(c.getRow()).add(c.getFamily(), c.getQualifier(), c.getTimestamp(), c.getValue()));
-      }
-
-      Scan rawScan = new Scan();
-      rawScan.setMaxVersions();
-
-      Transaction dummyTransaction = TxUtils.createDummyTransaction(txVisibilityState);
-      Scan txScan = new Scan();
-      txScan.setMaxVersions();
-      txScan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttls, dummyTransaction, true),
-                          TxUtils.getMaxVisibleTimestamp(dummyTransaction));
-      txScan.setFilter(TransactionFilters.getVisibilityFilter(dummyTransaction, ttls, false, ScanType.USER_SCAN));
-
-      // read all back with raw scanner
-      scanAndAssert(region, cells, rawScan);
-
-      // read all back with transaction filter
-      scanAndAssert(region, cells, txScan);
-
-      // force a flush to clear the memstore
-      region.flushcache();
-      scanAndAssert(region, cells, txScan);
-
-      // force a major compaction to remove any expired cells
-      region.compactStores(true);
-      scanAndAssert(region, cells, txScan);
-
-      // Reduce TTL, this should make cells with timestamps older and olderTx expire
-      long newTtl = ttlMillis / 2 - 1;
-      region = updateTtl(region, familyBytes, newTtl);
-      ttls.put(familyBytes, newTtl);
-      txScan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttls, dummyTransaction, true),
-                          TxUtils.getMaxVisibleTimestamp(dummyTransaction));
-      txScan.setFilter(TransactionFilters.getVisibilityFilter(dummyTransaction, ttls, false, ScanType.USER_SCAN));
-
-      // Raw scan should still give all cells
-      scanAndAssert(region, cells, rawScan);
-      // However, tx scan should not return expired cells
-      scanAndAssert(region, select(cells, 1, 3, 5), txScan);
-
-      region.flushcache();
-      scanAndAssert(region, cells, rawScan);
-
-      // force a major compaction to remove any expired cells
-      region.compactStores(true);
-      // This time raw scan too should not return expired cells, as they would be dropped during major compaction
-      scanAndAssert(region, select(cells, 1, 3, 5), rawScan);
-
-      // Reduce TTL again to 1 ms, this should expire all cells
-      newTtl = 1;
-      region = updateTtl(region, familyBytes, newTtl);
-      ttls.put(familyBytes, newTtl);
-      txScan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttls, dummyTransaction, true),
-                          TxUtils.getMaxVisibleTimestamp(dummyTransaction));
-      txScan.setFilter(TransactionFilters.getVisibilityFilter(dummyTransaction, ttls, false, ScanType.USER_SCAN));
-
-      // force a major compaction to remove expired cells
-      region.compactStores(true);
-      // This time raw scan should not return any cells, as all cells have expired.
-      scanAndAssert(region, Collections.<Cell>emptyList(), rawScan);
-    } finally {
-      region.close();
-    }
-  }
-
-  private List<Cell> select(List<Cell> cells, int... indexes) {
-    List<Cell> newCells = new ArrayList<>();
-    for (int i : indexes) {
-      newCells.add(cells.get(i));
-    }
-    return newCells;
-  }
-
-  @SuppressWarnings("StatementWithEmptyBody")
-  private void scanAndAssert(HRegion region, List<Cell> expected, Scan scan) throws Exception {
-    try (RegionScanner regionScanner = region.getScanner(scan)) {
-      List<Cell> results = Lists.newArrayList();
-      while (regionScanner.next(results)) { }
-      assertEquals(expected, results);
-    }
-  }
-
-  private HRegion updateTtl(HRegion region, byte[] family, long ttl) throws Exception {
-    region.close();
-    HTableDescriptor htd = region.getTableDesc();
-    HColumnDescriptor cfd = new HColumnDescriptor(family);
-    if (ttl > 0) {
-      cfd.setValue(TxConstants.PROPERTY_TTL, String.valueOf(ttl));
-    }
-    cfd.setMaxVersions(10);
-    htd.addFamily(cfd);
-    return HRegion.openHRegion(region.getRegionInfo(), htd, region.getLog(), conf,
-                               new MockRegionServerServices(conf, null), null);
-  }
-
-  private HRegion createRegion(String tableName, byte[] family, long ttl) throws IOException {
-    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
-    HColumnDescriptor cfd = new HColumnDescriptor(family);
-    if (ttl > 0) {
-      cfd.setValue(TxConstants.PROPERTY_TTL, String.valueOf(ttl));
-    }
-    cfd.setMaxVersions(10);
-    htd.addFamily(cfd);
-    htd.addCoprocessor(TransactionProcessor.class.getName());
-    Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), htd.getTableName());
-    Path hlogPath = new Path(FSUtils.getRootDir(conf) + "/hlog");
-    FileSystem fs = FileSystem.get(conf);
-    assertTrue(fs.mkdirs(tablePath));
-    HLog hLog = HLogFactory.createHLog(fs, hlogPath, tableName, conf);
-    HRegionInfo regionInfo = new HRegionInfo(TableName.valueOf(tableName));
-    HRegionFileSystem regionFS = HRegionFileSystem.createRegionOnFileSystem(conf, fs, tablePath, regionInfo);
-    return new HRegion(regionFS, hLog, conf, htd, new MockRegionServerServices(conf, null));
-  }
-
-  private void assertKeyValueMatches(List<Cell> results, int index, long[] versions) {
-    byte[][] values = new byte[versions.length][];
-    for (int i = 0; i < versions.length; i++) {
-      values[i] = Bytes.toBytes(versions[i]);
-    }
-    assertKeyValueMatches(results, index, versions, values);
-  }
-
-  private void assertKeyValueMatches(List<Cell> results, int index, long[] versions, byte[][] values) {
-    assertEquals(versions.length, results.size());
-    assertEquals(values.length, results.size());
-    for (int i = 0; i < versions.length; i++) {
-      Cell kv = results.get(i);
-      assertArrayEquals(Bytes.toBytes(index), kv.getRow());
-      assertEquals(versions[i], kv.getTimestamp());
-      assertArrayEquals(values[i], kv.getValue());
-    }
-  }
-
-  @Test
-  public void testTransactionStateCache() throws Exception {
-    TransactionStateCache cache = new TransactionStateCache();
-    cache.setConf(conf);
-    cache.startAndWait();
-    // verify that the transaction snapshot read matches what we wrote in setupBeforeClass()
-    TransactionVisibilityState cachedSnapshot = cache.getLatestState();
-    assertNotNull(cachedSnapshot);
-    assertEquals(invalidSet, cachedSnapshot.getInvalid());
-    cache.stopAndWait();
-  }
-
-  private static class MockRegionServerServices implements RegionServerServices {
-    private final Configuration hConf;
-    private final ZooKeeperWatcher zookeeper;
-    private final Map<String, HRegion> regions = new HashMap<String, HRegion>();
-    private boolean stopping = false;
-    private final ConcurrentSkipListMap<byte[], Boolean> rit = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
-    private HFileSystem hfs = null;
-    private ServerName serverName = null;
-    private RpcServerInterface rpcServer = null;
-    private volatile boolean abortRequested;
-
-
-    public MockRegionServerServices(Configuration hConf, ZooKeeperWatcher zookeeper) {
-      this.hConf = hConf;
-      this.zookeeper = zookeeper;
-    }
-
-    @Override
-    public boolean isStopping() {
-      return stopping;
-    }
-
-    @Override
-    public HLog getWAL(HRegionInfo regionInfo) throws IOException {
-      return null;
-    }
-
-    @Override
-    public CompactionRequestor getCompactionRequester() {
-      return null;
-    }
-
-    @Override
-    public FlushRequester getFlushRequester() {
-      return null;
-    }
-
-    @Override
-    public RegionServerAccounting getRegionServerAccounting() {
-      return null;
-    }
-
-    @Override
-    public TableLockManager getTableLockManager() {
-      return new TableLockManager.NullTableLockManager();
-    }
-
-    @Override
-    public void postOpenDeployTasks(HRegion r, CatalogTracker ct) throws KeeperException, IOException {
-    }
-
-    @Override
-    public RpcServerInterface getRpcServer() {
-      return rpcServer;
-    }
-
-    @Override
-    public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
-      return rit;
-    }
-
-    @Override
-    public FileSystem getFileSystem() {
-      return hfs;
-    }
-
-    @Override
-    public Leases getLeases() {
-      return null;
-    }
-
-    @Override
-    public ExecutorService getExecutorService() {
-      return null;
-    }
-
-    @Override
-    public CatalogTracker getCatalogTracker() {
-      return null;
-    }
-
-    @Override
-    public Map<String, HRegion> getRecoveringRegions() {
-      return null;
-    }
-
-    @Override
-    public void updateRegionFavoredNodesMapping(String encodedRegionName, List<HBaseProtos.ServerName> favoredNodes) {
-    }
-
-    @Override
-    public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
-      return new InetSocketAddress[0];
-    }
-
-    @Override
-    public void addToOnlineRegions(HRegion r) {
-      regions.put(r.getRegionNameAsString(), r);
-    }
-
-    @Override
-    public boolean removeFromOnlineRegions(HRegion r, ServerName destination) {
-      return regions.remove(r.getRegionInfo().getEncodedName()) != null;
-    }
-
-    @Override
-    public HRegion getFromOnlineRegions(String encodedRegionName) {
-      return regions.get(encodedRegionName);
-    }
-
-    @Override
-    public List<HRegion> getOnlineRegions(TableName tableName) throws IOException {
-      return null;
-    }
-
-    @Override
-    public Configuration getConfiguration() {
-      return hConf;
-    }
-
-    @Override
-    public ZooKeeperWatcher getZooKeeper() {
-      return zookeeper;
-    }
-
-    @Override
-    public ServerName getServerName() {
-      return serverName;
-    }
-
-    @Override
-    public void abort(String why, Throwable e) {
-      this.abortRequested = true;
-    }
-
-    @Override
-    public boolean isAborted() {
-      return abortRequested;
-    }
-
-    @Override
-    public void stop(String why) {
-      this.stopping = true;
-    }
-
-    @Override
-    public boolean isStopped() {
-      return stopping;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/d00d9f6d/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase96/coprocessor/TransactionVisibilityFilterTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase96/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase96/coprocessor/TransactionVisibilityFilterTest.java
deleted file mode 100644
index 678ba5a..0000000
--- a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase96/coprocessor/TransactionVisibilityFilterTest.java
+++ /dev/null
@@ -1,374 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tephra.hbase96.coprocessor;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Maps;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.FilterBase;
-import org.apache.hadoop.hbase.regionserver.ScanType;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.tephra.Transaction;
-import org.apache.tephra.TxConstants;
-import org.apache.tephra.hbase.AbstractTransactionVisibilityFilterTest;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import javax.annotation.Nullable;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * HBase 0.96 specific test for filtering logic applied when reading data transactionally.
- */
-public class TransactionVisibilityFilterTest extends AbstractTransactionVisibilityFilterTest {
-  /**
-   * Test filtering of KeyValues for in-progress and invalid transactions.
-   * @throws Exception
-   */
-  @Test
-  public void testFiltering() throws Exception {
-    TxFilterFactory txFilterFactory = new TxFilterFactory() {
-      @Override
-      public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
-        return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN);
-      }
-    };
-    runFilteringTest(txFilterFactory,
-                     ImmutableList.of(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
-                                      Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
-                                      Filter.ReturnCode.SKIP,
-                                      Filter.ReturnCode.SKIP,
-                                      Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
-                                      Filter.ReturnCode.INCLUDE_AND_NEXT_COL));
-  }
-
-  @Test
-  public void testSubFilter() throws Exception {
-    final FilterBase includeFilter = new FilterBase() {
-      @Override
-      public ReturnCode filterKeyValue(Cell ignored) throws IOException {
-        return ReturnCode.INCLUDE;
-      }
-    };
-    TxFilterFactory txFilterFactory = new TxFilterFactory() {
-      @Override
-      public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
-        return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeFilter);
-      }
-    };
-    runFilteringTest(txFilterFactory,
-                     ImmutableList.of(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
-                                      Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
-                                      Filter.ReturnCode.SKIP,
-                                      Filter.ReturnCode.SKIP,
-                                      Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
-                                      Filter.ReturnCode.INCLUDE_AND_NEXT_COL));
-
-    final Filter skipFilter = new FilterBase() {
-      @Override
-      public ReturnCode filterKeyValue(Cell ignored) throws IOException {
-        return ReturnCode.SKIP;
-      }
-    };
-    txFilterFactory = new TxFilterFactory() {
-      @Override
-      public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
-        return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, skipFilter);
-      }
-    };
-    runFilteringTest(txFilterFactory,
-                     ImmutableList.of(Filter.ReturnCode.NEXT_COL,
-                                      Filter.ReturnCode.NEXT_COL,
-                                      Filter.ReturnCode.SKIP,
-                                      Filter.ReturnCode.SKIP,
-                                      Filter.ReturnCode.NEXT_COL,
-                                      Filter.ReturnCode.NEXT_COL));
-
-    final Filter includeNextFilter = new FilterBase() {
-      @Override
-      public ReturnCode filterKeyValue(Cell ignored) throws IOException {
-        return ReturnCode.INCLUDE_AND_NEXT_COL;
-      }
-    };
-    txFilterFactory = new TxFilterFactory() {
-      @Override
-      public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
-        return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeNextFilter);
-      }
-    };
-    runFilteringTest(txFilterFactory,
-                     ImmutableList.of(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
-                                      Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
-                                      Filter.ReturnCode.SKIP,
-                                      Filter.ReturnCode.SKIP,
-                                      Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
-                                      Filter.ReturnCode.INCLUDE_AND_NEXT_COL));
-
-    final Filter nextColFilter = new FilterBase() {
-      @Override
-      public ReturnCode filterKeyValue(Cell ignored) throws IOException {
-        return ReturnCode.NEXT_COL;
-      }
-    };
-    txFilterFactory = new TxFilterFactory() {
-      @Override
-      public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
-        return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, nextColFilter);
-      }
-    };
-    runFilteringTest(txFilterFactory,
-                     ImmutableList.of(Filter.ReturnCode.NEXT_COL,
-                                      Filter.ReturnCode.NEXT_COL,
-                                      Filter.ReturnCode.SKIP,
-                                      Filter.ReturnCode.SKIP,
-                                      Filter.ReturnCode.NEXT_COL,
-                                      Filter.ReturnCode.NEXT_COL));
-
-  }
-
-  @Test
-  public void testSubFilterOverride() throws Exception {
-    final FilterBase includeFilter = new FilterBase() {
-      @Override
-      public ReturnCode filterKeyValue(Cell ignored) throws IOException {
-        return ReturnCode.INCLUDE;
-      }
-    };
-    TxFilterFactory txFilterFactory = new TxFilterFactory() {
-      @Override
-      public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
-        return new CustomTxFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeFilter);
-      }
-    };
-    runFilteringTest(txFilterFactory,
-                     ImmutableList.of(Filter.ReturnCode.INCLUDE,
-                                      Filter.ReturnCode.INCLUDE,
-                                      Filter.ReturnCode.SKIP,
-                                      Filter.ReturnCode.SKIP,
-                                      Filter.ReturnCode.INCLUDE,
-                                      Filter.ReturnCode.INCLUDE));
-
-    final Filter skipFilter = new FilterBase() {
-      @Override
-      public ReturnCode filterKeyValue(Cell ignored) throws IOException {
-        return ReturnCode.SKIP;
-      }
-    };
-    txFilterFactory = new TxFilterFactory() {
-      @Override
-      public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
-        return new CustomTxFilter(tx, familyTTLs, false, ScanType.USER_SCAN, skipFilter);
-      }
-    };
-    runFilteringTest(txFilterFactory,
-                     ImmutableList.of(Filter.ReturnCode.NEXT_COL,
-                                      Filter.ReturnCode.NEXT_COL,
-                                      Filter.ReturnCode.SKIP,
-                                      Filter.ReturnCode.SKIP,
-                                      Filter.ReturnCode.NEXT_COL,
-                                      Filter.ReturnCode.NEXT_COL));
-
-    final Filter includeNextFilter = new FilterBase() {
-      @Override
-      public ReturnCode filterKeyValue(Cell ignored) throws IOException {
-        return ReturnCode.INCLUDE_AND_NEXT_COL;
-      }
-    };
-    txFilterFactory = new TxFilterFactory() {
-      @Override
-      public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
-        return new CustomTxFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeNextFilter);
-      }
-    };
-    runFilteringTest(txFilterFactory,
-                     ImmutableList.of(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
-                                      Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
-                                      Filter.ReturnCode.SKIP,
-                                      Filter.ReturnCode.SKIP,
-                                      Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
-                                      Filter.ReturnCode.INCLUDE_AND_NEXT_COL));
-
-    final Filter nextColFilter = new FilterBase() {
-      @Override
-      public ReturnCode filterKeyValue(Cell ignored) throws IOException {
-        return ReturnCode.NEXT_COL;
-      }
-    };
-    txFilterFactory = new TxFilterFactory() {
-      @Override
-      public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
-        return new CustomTxFilter(tx, familyTTLs, false, ScanType.USER_SCAN, nextColFilter);
-      }
-    };
-    runFilteringTest(txFilterFactory,
-                     ImmutableList.of(Filter.ReturnCode.NEXT_COL,
-                                      Filter.ReturnCode.NEXT_COL,
-                                      Filter.ReturnCode.SKIP,
-                                      Filter.ReturnCode.SKIP,
-                                      Filter.ReturnCode.NEXT_COL,
-                                      Filter.ReturnCode.NEXT_COL));
-
-  }
-
-  private void runFilteringTest(TxFilterFactory txFilterFactory,
-                                List<Filter.ReturnCode> assertCodes) throws Exception {
-
-    /*
-     * Start and stop some transactions.  This will give us a transaction state something like the following
-     * (numbers only reflect ordering, not actual transaction IDs):
-     *   6  - in progress
-     *   5  - committed
-     *   4  - invalid
-     *   3  - in-progress
-     *   2  - committed
-     *   1  - committed
-     *
-     *   read ptr = 5
-     *   write ptr = 6
-     */
-
-    Transaction tx1 = txManager.startShort();
-    assertTrue(txManager.canCommit(tx1, EMPTY_CHANGESET));
-    assertTrue(txManager.commit(tx1));
-
-    Transaction tx2 = txManager.startShort();
-    assertTrue(txManager.canCommit(tx2, EMPTY_CHANGESET));
-    assertTrue(txManager.commit(tx2));
-
-    Transaction tx3 = txManager.startShort();
-    Transaction tx4 = txManager.startShort();
-    txManager.invalidate(tx4.getTransactionId());
-
-    Transaction tx5 = txManager.startShort();
-    assertTrue(txManager.canCommit(tx5, EMPTY_CHANGESET));
-    assertTrue(txManager.commit(tx5));
-
-    Transaction tx6 = txManager.startShort();
-
-    Map<byte[], Long> ttls = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
-    Filter filter = txFilterFactory.getTxFilter(tx6, ttls);
-
-    assertEquals(assertCodes.get(5),
-                 filter.filterKeyValue(newKeyValue("row1", "val1", tx6.getTransactionId())));
-    assertEquals(assertCodes.get(4),
-                 filter.filterKeyValue(newKeyValue("row1", "val1", tx5.getTransactionId())));
-    assertEquals(assertCodes.get(3),
-                 filter.filterKeyValue(newKeyValue("row1", "val1", tx4.getTransactionId())));
-    assertEquals(assertCodes.get(2),
-                 filter.filterKeyValue(newKeyValue("row1", "val1", tx3.getTransactionId())));
-    assertEquals(assertCodes.get(1),
-                 filter.filterKeyValue(newKeyValue("row1", "val1", tx2.getTransactionId())));
-    assertEquals(assertCodes.get(0),
-                 filter.filterKeyValue(newKeyValue("row1", "val1", tx1.getTransactionId())));
-  }
-
-  /**
-   * Test filtering for TTL settings.
-   * @throws Exception
-   */
-  @Test
-  public void testTTLFiltering() throws Exception {
-    Map<byte[], Long> ttls = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
-    ttls.put(FAM, 10L);
-    ttls.put(FAM2, 30L);
-    ttls.put(FAM3, 0L);
-
-    Transaction tx = txManager.startShort();
-    long now = tx.getVisibilityUpperBound();
-    Filter filter = new TransactionVisibilityFilter(tx, ttls, false, ScanType.USER_SCAN);
-    assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
-                 filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now)));
-    assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
-                 filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now - 1 * TxConstants.MAX_TX_PER_MS)));
-    assertEquals(Filter.ReturnCode.NEXT_COL,
-                 filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now - 11 * TxConstants.MAX_TX_PER_MS)));
-    assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
-                 filter.filterKeyValue(newKeyValue("row1", FAM2, "val1", now - 11 * TxConstants.MAX_TX_PER_MS)));
-    assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
-                 filter.filterKeyValue(newKeyValue("row1", FAM2, "val1", now - 21 * TxConstants.MAX_TX_PER_MS)));
-    assertEquals(Filter.ReturnCode.NEXT_COL,
-                 filter.filterKeyValue(newKeyValue("row1", FAM2, "val1", now - 31 * TxConstants.MAX_TX_PER_MS)));
-    assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
-                 filter.filterKeyValue(newKeyValue("row1", FAM3, "val1", now - 31 * TxConstants.MAX_TX_PER_MS)));
-    assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
-                 filter.filterKeyValue(newKeyValue("row1", FAM3, "val1", now - 1001 * TxConstants.MAX_TX_PER_MS)));
-    assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
-                 filter.filterKeyValue(newKeyValue("row2", FAM, "val1", now)));
-    assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
-                 filter.filterKeyValue(newKeyValue("row2", FAM, "val1", now - 1 * TxConstants.MAX_TX_PER_MS)));
-
-    // Verify ttl for pre-existing, non-transactional data
-    long preNow = now / TxConstants.MAX_TX_PER_MS;
-    assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
-                 filter.filterKeyValue(newKeyValue("row1", FAM, "val1", preNow)));
-    assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
-                 filter.filterKeyValue(newKeyValue("row1", FAM, "val1", preNow - 9L)));
-    assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
-                 filter.filterKeyValue(newKeyValue("row1", FAM, "val1", preNow - 10L)));
-    assertEquals(Filter.ReturnCode.NEXT_COL,
-                 filter.filterKeyValue(newKeyValue("row1", FAM, "val1", preNow - 11L)));
-    assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
-                 filter.filterKeyValue(newKeyValue("row1", FAM3, "val1", preNow)));
-    assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
-                 filter.filterKeyValue(newKeyValue("row1", FAM3, "val1", preNow - 9L)));
-    assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
-                 filter.filterKeyValue(newKeyValue("row1", FAM3, "val1", preNow - 10L)));
-    assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
-                 filter.filterKeyValue(newKeyValue("row1", FAM3, "val1", preNow - 1001L)));
-  }
-
-  protected KeyValue newKeyValue(String rowkey, String value, long timestamp) {
-    return new KeyValue(Bytes.toBytes(rowkey), FAM, COL, timestamp, Bytes.toBytes(value));
-  }
-
-  protected KeyValue newKeyValue(String rowkey, byte[] family, String value, long timestamp) {
-    return new KeyValue(Bytes.toBytes(rowkey), family, COL, timestamp, Bytes.toBytes(value));
-  }
-
-  private interface TxFilterFactory {
-    Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs);
-  }
-
-  private class CustomTxFilter extends TransactionVisibilityFilter {
-    public CustomTxFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues, ScanType scanType,
-                          @Nullable Filter cellFilter) {
-      super(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter);
-    }
-
-    @Override
-    protected ReturnCode determineReturnCode(ReturnCode txFilterCode, ReturnCode subFilterCode) {
-      switch (subFilterCode) {
-        case INCLUDE:
-          return ReturnCode.INCLUDE;
-        case INCLUDE_AND_NEXT_COL:
-          return ReturnCode.INCLUDE_AND_NEXT_COL;
-        case SKIP:
-          return txFilterCode == ReturnCode.INCLUDE ? ReturnCode.SKIP : ReturnCode.NEXT_COL;
-        default:
-          return subFilterCode;
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/d00d9f6d/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/HBase98ConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/HBase98ConfigurationProvider.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/HBase98ConfigurationProvider.java
new file mode 100644
index 0000000..5505a9b
--- /dev/null
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/HBase98ConfigurationProvider.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.tephra.util.ConfigurationProvider;
+
+/**
+ * HBase 0.98 version of {@link ConfigurationProvider}.
+ */
+public class HBase98ConfigurationProvider extends ConfigurationProvider {
+  @Override
+  public Configuration get() {
+    return HBaseConfiguration.create();
+  }
+
+  @Override
+  public Configuration get(Configuration baseConf) {
+    return HBaseConfiguration.create(baseConf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/d00d9f6d/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/SecondaryIndexTable.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/SecondaryIndexTable.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/SecondaryIndexTable.java
new file mode 100644
index 0000000..7deb600
--- /dev/null
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/SecondaryIndexTable.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tephra.hbase;
+
+import com.google.common.base.Throwables;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TransactionContext;
+import org.apache.tephra.TransactionFailureException;
+import org.apache.tephra.distributed.TransactionServiceClient;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A Transactional SecondaryIndexTable.
+ */
+public class SecondaryIndexTable {
+  private byte[] secondaryIndex;
+  private TransactionAwareHTable transactionAwareHTable;
+  private TransactionAwareHTable secondaryIndexTable;
+  private TransactionContext transactionContext;
+  private final TableName secondaryIndexTableName;
+  private static final byte[] secondaryIndexFamily = Bytes.toBytes("secondaryIndexFamily");
+  private static final byte[] secondaryIndexQualifier = Bytes.toBytes('r');
+  private static final byte[] DELIMITER  = new byte[] {0};
+
+  public SecondaryIndexTable(TransactionServiceClient transactionServiceClient, HTableInterface hTable,
+                             byte[] secondaryIndex) {
+    secondaryIndexTableName = TableName.valueOf(hTable.getName().getNameAsString() + ".idx");
+    HTable secondaryIndexHTable = null;
+    HBaseAdmin hBaseAdmin = null;
+    try {
+      hBaseAdmin = new HBaseAdmin(hTable.getConfiguration());
+      if (!hBaseAdmin.tableExists(secondaryIndexTableName)) {
+        hBaseAdmin.createTable(new HTableDescriptor(secondaryIndexTableName));
+      }
+      secondaryIndexHTable = new HTable(hTable.getConfiguration(), secondaryIndexTableName);
+    } catch (Exception e) {
+      Throwables.propagate(e);
+    } finally {
+      try {
+        hBaseAdmin.close();
+      } catch (Exception e) {
+        Throwables.propagate(e);
+      }
+    }
+
+    this.secondaryIndex = secondaryIndex;
+    this.transactionAwareHTable = new TransactionAwareHTable(hTable);
+    this.secondaryIndexTable = new TransactionAwareHTable(secondaryIndexHTable);
+    this.transactionContext = new TransactionContext(transactionServiceClient, transactionAwareHTable,
+                                                secondaryIndexTable);
+  }
+
+  public Result get(Get get) throws IOException {
+    return get(Collections.singletonList(get))[0];
+  }
+
+  public Result[] get(List<Get> gets) throws IOException {
+    try {
+      transactionContext.start();
+      Result[] result = transactionAwareHTable.get(gets);
+      transactionContext.finish();
+      return result;
+    } catch (Exception e) {
+      try {
+        transactionContext.abort();
+      } catch (TransactionFailureException e1) {
+        throw new IOException("Could not rollback transaction", e1);
+      }
+    }
+    return null;
+  }
+
+  public Result[] getByIndex(byte[] value) throws IOException {
+    try {
+      transactionContext.start();
+      Scan scan = new Scan(value, Bytes.add(value, new byte[0]));
+      scan.addColumn(secondaryIndexFamily, secondaryIndexQualifier);
+      ResultScanner indexScanner = secondaryIndexTable.getScanner(scan);
+
+      ArrayList<Get> gets = new ArrayList<Get>();
+      for (Result result : indexScanner) {
+        for (Cell cell : result.listCells()) {
+          gets.add(new Get(cell.getValue()));
+        }
+      }
+      Result[] results = transactionAwareHTable.get(gets);
+      transactionContext.finish();
+      return results;
+    } catch (Exception e) {
+      try {
+        transactionContext.abort();
+      } catch (TransactionFailureException e1) {
+        throw new IOException("Could not rollback transaction", e1);
+      }
+    }
+    return null;
+  }
+
+  public void put(Put put) throws IOException {
+    put(Collections.singletonList(put));
+  }
+
+
+  public void put(List<Put> puts) throws IOException {
+    try {
+      transactionContext.start();
+      ArrayList<Put> secondaryIndexPuts = new ArrayList<Put>();
+      for (Put put : puts) {
+        List<Put> indexPuts = new ArrayList<Put>();
+        Set<Map.Entry<byte[], List<KeyValue>>> familyMap = put.getFamilyMap().entrySet();
+        for (Map.Entry<byte [], List<KeyValue>> family : familyMap) {
+          for (KeyValue value : family.getValue()) {
+            if (Bytes.equals(value.getQualifierArray(), value.getQualifierOffset(), value.getQualifierLength(),
+                             secondaryIndex, 0, secondaryIndex.length)) {
+              byte[] secondaryRow = Bytes.add(value.getQualifier(), DELIMITER,
+                                                    Bytes.add(value.getValue(), DELIMITER,
+                                                              value.getRow()));
+              Put indexPut = new Put(secondaryRow);
+              indexPut.add(secondaryIndexFamily, secondaryIndexQualifier, put.getRow());
+              indexPuts.add(indexPut);
+            }
+          }
+        }
+        secondaryIndexPuts.addAll(indexPuts);
+      }
+      transactionAwareHTable.put(puts);
+      secondaryIndexTable.put(secondaryIndexPuts);
+      transactionContext.finish();
+    } catch (Exception e) {
+      try {
+        transactionContext.abort();
+      } catch (TransactionFailureException e1) {
+        throw new IOException("Could not rollback transaction", e1);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/d00d9f6d/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
new file mode 100644
index 0000000..f3ad6f8
--- /dev/null
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
@@ -0,0 +1,646 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tephra.hbase;
+
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.OperationWithAttributes;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.tephra.AbstractTransactionAwareTable;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionAware;
+import org.apache.tephra.TxConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+
+/**
+ * A Transaction Aware HTable implementation for HBase 0.98. Operations are committed as usual,
+ * but upon a failed or aborted transaction, they are rolled back to the state before the transaction
+ * was started.
+ */
+public class TransactionAwareHTable extends AbstractTransactionAwareTable
+    implements HTableInterface, TransactionAware {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TransactionAwareHTable.class);
+  private final HTableInterface hTable;
+
+  /**
+   * Create a transactional aware instance of the passed HTable
+   *
+   * @param hTable underlying HBase table to use
+   */
+  public TransactionAwareHTable(HTableInterface hTable) {
+    this(hTable, false);
+  }
+
+  /**
+   * Create a transactional aware instance of the passed HTable
+   *
+   * @param hTable underlying HBase table to use
+   * @param conflictLevel level of conflict detection to perform (defaults to {@code COLUMN})
+   */
+  public TransactionAwareHTable(HTableInterface hTable, TxConstants.ConflictDetection conflictLevel) {
+    this(hTable, conflictLevel, false);
+  }
+
+  /**
+   * Create a transactional aware instance of the passed HTable, with the option
+   * of allowing non-transactional operations.
+   * @param hTable underlying HBase table to use
+   * @param allowNonTransactional if true, additional operations (checkAndPut, increment, checkAndDelete)
+   *                              will be available, though non-transactional
+   */
+  public TransactionAwareHTable(HTableInterface hTable, boolean allowNonTransactional) {
+    this(hTable, TxConstants.ConflictDetection.COLUMN, allowNonTransactional);
+  }
+
+  /**
+   * Create a transactional aware instance of the passed HTable, with the option
+   * of allowing non-transactional operations.
+   * @param hTable underlying HBase table to use
+   * @param conflictLevel level of conflict detection to perform (defaults to {@code COLUMN})
+   * @param allowNonTransactional if true, additional operations (checkAndPut, increment, checkAndDelete)
+   *                              will be available, though non-transactional
+   */
+  public TransactionAwareHTable(HTableInterface hTable, TxConstants.ConflictDetection conflictLevel,
+                                boolean allowNonTransactional) {
+    super(conflictLevel, allowNonTransactional);
+    this.hTable = hTable;
+  }
+
+  /* AbstractTransactionAwareTable implementation */
+
+  @Override
+  protected byte[] getTableKey() {
+    return getTableName();
+  }
+
+  @Override
+  protected boolean doCommit() throws IOException {
+    hTable.flushCommits();
+    return true;
+  }
+
+  @Override
+  protected boolean doRollback() throws Exception {
+    try {
+      // pre-size arraylist of deletes
+      int size = 0;
+      for (Set<ActionChange> cs : changeSets.values()) {
+        size += cs.size();
+      }
+      List<Delete> rollbackDeletes = new ArrayList<>(size);
+      for (Map.Entry<Long, Set<ActionChange>> entry : changeSets.entrySet()) {
+        long transactionTimestamp = entry.getKey();
+        for (ActionChange change : entry.getValue()) {
+          byte[] row = change.getRow();
+          byte[] family = change.getFamily();
+          byte[] qualifier = change.getQualifier();
+          Delete rollbackDelete = new Delete(row);
+          makeRollbackOperation(rollbackDelete);
+          switch (conflictLevel) {
+            case ROW:
+            case NONE:
+              // issue family delete for the tx write pointer
+              rollbackDelete.deleteFamilyVersion(change.getFamily(), transactionTimestamp);
+              break;
+            case COLUMN:
+              if (family != null && qualifier == null) {
+                rollbackDelete.deleteFamilyVersion(family, transactionTimestamp);
+              } else if (family != null && qualifier != null) {
+                rollbackDelete.deleteColumn(family, qualifier, transactionTimestamp);
+              }
+              break;
+            default:
+              throw new IllegalStateException("Unknown conflict detection level: " + conflictLevel);
+          }
+          rollbackDeletes.add(rollbackDelete);
+        }
+      }
+      hTable.delete(rollbackDeletes);
+      return true;
+    } finally {
+      try {
+        hTable.flushCommits();
+      } catch (Exception e) {
+        LOG.error("Could not flush HTable commits", e);
+      }
+      tx = null;
+      changeSets.clear();
+    }
+  }
+
+  /* HTableInterface implementation */
+
+  @Override
+  public byte[] getTableName() {
+    return hTable.getTableName();
+  }
+
+  @Override
+  public TableName getName() {
+    return hTable.getName();
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return hTable.getConfiguration();
+  }
+
+  @Override
+  public HTableDescriptor getTableDescriptor() throws IOException {
+    return hTable.getTableDescriptor();
+  }
+
+  @Override
+  public boolean exists(Get get) throws IOException {
+    if (tx == null) {
+      throw new IOException("Transaction not started");
+    }
+    return hTable.exists(transactionalizeAction(get));
+  }
+
+  @Override
+  public Boolean[] exists(List<Get> gets) throws IOException {
+    if (tx == null) {
+      throw new IOException("Transaction not started");
+    }
+    List<Get> transactionalizedGets = new ArrayList<Get>(gets.size());
+    for (Get get : gets) {
+      transactionalizedGets.add(transactionalizeAction(get));
+    }
+    return hTable.exists(transactionalizedGets);
+  }
+
+  @Override
+  public void batch(List<? extends Row> actions, Object[] results) throws IOException, InterruptedException {
+    if (tx == null) {
+      throw new IOException("Transaction not started");
+    }
+    hTable.batch(transactionalizeActions(actions), results);
+  }
+
+  @Override
+  public Object[] batch(List<? extends Row> actions) throws IOException, InterruptedException {
+    if (tx == null) {
+      throw new IOException("Transaction not started");
+    }
+    return hTable.batch(transactionalizeActions(actions));
+  }
+
+  @Override
+  public <R> void batchCallback(List<? extends Row> actions, Object[] results, Batch.Callback<R> callback) throws
+    IOException, InterruptedException {
+    if (tx == null) {
+      throw new IOException("Transaction not started");
+    }
+    hTable.batchCallback(transactionalizeActions(actions), results, callback);
+  }
+
+  @Override
+  public <R> Object[] batchCallback(List<? extends Row> actions, Batch.Callback<R> callback) throws IOException,
+    InterruptedException {
+    if (tx == null) {
+      throw new IOException("Transaction not started");
+    }
+    return hTable.batchCallback(transactionalizeActions(actions), callback);
+  }
+
+  @Override
+  public Result get(Get get) throws IOException {
+    if (tx == null) {
+      throw new IOException("Transaction not started");
+    }
+    return hTable.get(transactionalizeAction(get));
+  }
+
+  @Override
+  public Result[] get(List<Get> gets) throws IOException {
+    if (tx == null) {
+      throw new IOException("Transaction not started");
+    }
+    ArrayList<Get> transactionalizedGets = new ArrayList<Get>();
+    for (Get get : gets) {
+      transactionalizedGets.add(transactionalizeAction(get));
+    }
+    return hTable.get(transactionalizedGets);
+  }
+
+  @Override
+  public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
+    if (allowNonTransactional) {
+      return hTable.getRowOrBefore(row, family);
+    } else {
+      throw new UnsupportedOperationException("Operation is not supported transactionally");
+    }
+  }
+
+  @Override
+  public ResultScanner getScanner(Scan scan) throws IOException {
+    if (tx == null) {
+      throw new IOException("Transaction not started");
+    }
+    return hTable.getScanner(transactionalizeAction(scan));
+  }
+
+  @Override
+  public ResultScanner getScanner(byte[] family) throws IOException {
+    if (tx == null) {
+      throw new IOException("Transaction not started");
+    }
+    Scan scan = new Scan();
+    scan.addFamily(family);
+    return hTable.getScanner(transactionalizeAction(scan));
+  }
+
+  @Override
+  public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException {
+    if (tx == null) {
+      throw new IOException("Transaction not started");
+    }
+    Scan scan = new Scan();
+    scan.addColumn(family, qualifier);
+    return hTable.getScanner(transactionalizeAction(scan));
+  }
+
+  @Override
+  public void put(Put put) throws IOException {
+    if (tx == null) {
+      throw new IOException("Transaction not started");
+    }
+    Put txPut = transactionalizeAction(put);
+    hTable.put(txPut);
+  }
+
+  @Override
+  public void put(List<Put> puts) throws IOException {
+    if (tx == null) {
+      throw new IOException("Transaction not started");
+    }
+    List<Put> transactionalizedPuts = new ArrayList<Put>(puts.size());
+    for (Put put : puts) {
+      Put txPut = transactionalizeAction(put);
+      transactionalizedPuts.add(txPut);
+    }
+    hTable.put(transactionalizedPuts);
+  }
+
+  @Override
+  public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put) throws IOException {
+    if (allowNonTransactional) {
+      return hTable.checkAndPut(row, family, qualifier, value, put);
+    } else {
+      throw new UnsupportedOperationException("Operation is not supported transactionally");
+    }
+  }
+
+  @Override
+  public void delete(Delete delete) throws IOException {
+    if (tx == null) {
+      throw new IOException("Transaction not started");
+    }
+    hTable.delete(transactionalizeAction(delete));
+  }
+
+  @Override
+  public void delete(List<Delete> deletes) throws IOException {
+    if (tx == null) {
+      throw new IOException("Transaction not started");
+    }
+    List<Delete> transactionalizedDeletes = new ArrayList<Delete>(deletes.size());
+    for (Delete delete : deletes) {
+      Delete txDelete = transactionalizeAction(delete);
+      transactionalizedDeletes.add(txDelete);
+    }
+    hTable.delete(transactionalizedDeletes);
+  }
+
+  @Override
+  public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[] value, Delete delete)
+    throws IOException {
+    if (allowNonTransactional) {
+      return hTable.checkAndDelete(row, family, qualifier, value, delete);
+    } else {
+      throw new UnsupportedOperationException("Operation is not supported transactionally");
+    }
+  }
+
+  @Override
+  public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
+                                CompareFilter.CompareOp compareOp, byte[] value, RowMutations rowMutations)
+      throws IOException {
+    if (allowNonTransactional) {
+      return hTable.checkAndMutate(row, family, qualifier, compareOp, value, rowMutations);
+    }
+
+    throw new UnsupportedOperationException("checkAndMutate operation is not supported transactionally");
+  }
+
+  @Override
+  public void mutateRow(RowMutations rm) throws IOException {
+    if (tx == null) {
+      throw new IOException("Transaction not started");
+    }
+    RowMutations transactionalMutations = new RowMutations();
+    for (Mutation mutation : rm.getMutations()) {
+      if (mutation instanceof Put) {
+        transactionalMutations.add(transactionalizeAction((Put) mutation));
+      } else if (mutation instanceof Delete) {
+        transactionalMutations.add(transactionalizeAction((Delete) mutation));
+      }
+    }
+    hTable.mutateRow(transactionalMutations);
+  }
+
+  @Override
+  public Result append(Append append) throws IOException {
+    if (allowNonTransactional) {
+      return hTable.append(append);
+    } else {
+      throw new UnsupportedOperationException("Operation is not supported transactionally");
+    }
+  }
+
+  @Override
+  public Result increment(Increment increment) throws IOException {
+    if (allowNonTransactional) {
+      return hTable.increment(increment);
+    } else {
+      throw new UnsupportedOperationException("Operation is not supported transactionally");
+    }
+  }
+
+  @Override
+  public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) throws IOException {
+    if (allowNonTransactional) {
+      return hTable.incrementColumnValue(row, family, qualifier, amount);
+    } else {
+      throw new UnsupportedOperationException("Operation is not supported transactionally");
+    }
+  }
+
+  @Override
+  public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability)
+    throws IOException {
+    if (allowNonTransactional) {
+      return hTable.incrementColumnValue(row, family, qualifier, amount, durability);
+    } else {
+      throw new UnsupportedOperationException("Operation is not supported transactionally");
+    }
+  }
+
+  @Override
+  public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, boolean writeToWAL)
+    throws IOException {
+    if (allowNonTransactional) {
+      return hTable.incrementColumnValue(row, family, qualifier, amount, writeToWAL);
+    } else {
+      throw new UnsupportedOperationException("Operation is not supported transactionally");
+    }
+  }
+
+  @Override
+  public boolean isAutoFlush() {
+    return hTable.isAutoFlush();
+  }
+
+  @Override
+  public void flushCommits() throws IOException {
+    hTable.flushCommits();
+  }
+
+  @Override
+  public void close() throws IOException {
+    hTable.close();
+  }
+
+  @Override
+  public CoprocessorRpcChannel coprocessorService(byte[] row) {
+    return hTable.coprocessorService(row);
+  }
+
+  @Override
+  public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service, byte[] startKey, byte[] endKey,
+                                                                  Batch.Call<T, R> callable)
+    throws ServiceException, Throwable {
+    return hTable.coprocessorService(service, startKey, endKey, callable);
+  }
+
+  @Override
+  public <T extends Service, R> void coprocessorService(Class<T> service, byte[] startKey, byte[] endKey,
+                                                        Batch.Call<T, R> callable, Batch.Callback<R> callback)
+    throws ServiceException, Throwable {
+    hTable.coprocessorService(service, startKey, endKey, callable, callback);
+  }
+
+  @Override
+  public <R extends Message> Map<byte[], R> batchCoprocessorService(
+      MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey,
+      R responsePrototype) throws ServiceException, Throwable {
+    return hTable.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype);
+  }
+
+  @Override
+  public <R extends Message> void batchCoprocessorService(MethodDescriptor methodDescriptor,
+      Message request, byte[] startKey, byte[] endKey, R responsePrototype, Callback<R> callback)
+      throws ServiceException, Throwable {
+    hTable.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype, callback);
+  }
+
+  @Override
+  public void setAutoFlush(boolean autoFlush) {
+    setAutoFlushTo(autoFlush);
+  }
+
+  @Override
+  public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
+    hTable.setAutoFlush(autoFlush, clearBufferOnFail);
+  }
+
+  @Override
+  public void setAutoFlushTo(boolean autoFlush) {
+    hTable.setAutoFlushTo(autoFlush);
+  }
+
+  @Override
+  public long getWriteBufferSize() {
+    return hTable.getWriteBufferSize();
+  }
+
+  @Override
+  public void setWriteBufferSize(long writeBufferSize) throws IOException {
+    hTable.setWriteBufferSize(writeBufferSize);
+  }
+
+  // Helpers to get copies of objects with the timestamp set to the current transaction timestamp.
+
+  private Get transactionalizeAction(Get get) throws IOException {
+    addToOperation(get, tx);
+    return get;
+  }
+
+  private Scan transactionalizeAction(Scan scan) throws IOException {
+    addToOperation(scan, tx);
+    return scan;
+  }
+
+  private Put transactionalizeAction(Put put) throws IOException {
+    Put txPut = new Put(put.getRow(), tx.getWritePointer());
+    Set<Map.Entry<byte[], List<Cell>>> familyMap = put.getFamilyCellMap().entrySet();
+    if (!familyMap.isEmpty()) {
+      for (Map.Entry<byte[], List<Cell>> family : familyMap) {
+        List<Cell> familyValues = family.getValue();
+        if (!familyValues.isEmpty()) {
+          for (Cell value : familyValues) {
+            txPut.add(value.getFamily(), value.getQualifier(), tx.getWritePointer(), value.getValue());
+            addToChangeSet(txPut.getRow(), value.getFamily(), value.getQualifier());
+          }
+        }
+      }
+    }
+    for (Map.Entry<String, byte[]> entry : put.getAttributesMap().entrySet()) {
+      txPut.setAttribute(entry.getKey(), entry.getValue());
+    }
+    txPut.setDurability(put.getDurability());
+    addToOperation(txPut, tx);
+    return txPut;
+  }
+
+  private Delete transactionalizeAction(Delete delete) throws IOException {
+    long transactionTimestamp = tx.getWritePointer();
+
+    byte[] deleteRow = delete.getRow();
+    Delete txDelete = new Delete(deleteRow, transactionTimestamp);
+
+    Map<byte[], List<Cell>> familyToDelete = delete.getFamilyCellMap();
+    if (familyToDelete.isEmpty()) {
+      // perform a row delete if we are using row-level conflict detection
+      if (conflictLevel == TxConstants.ConflictDetection.ROW ||
+          conflictLevel == TxConstants.ConflictDetection.NONE) {
+        // Row delete leaves delete markers in all column families of the table
+        // Therefore get all the column families of the hTable from the HTableDescriptor and add them to the changeSet
+        for (HColumnDescriptor columnDescriptor : hTable.getTableDescriptor().getColumnFamilies()) {
+          // no need to identify individual columns deleted
+          addToChangeSet(deleteRow, columnDescriptor.getName(), null);
+        }
+      } else {
+        Result result = get(new Get(delete.getRow()));
+        // Delete everything
+        NavigableMap<byte[], NavigableMap<byte[], byte[]>> resultMap = result.getNoVersionMap();
+        for (Map.Entry<byte[], NavigableMap<byte[], byte[]>> familyEntry : resultMap.entrySet()) {
+          NavigableMap<byte[], byte[]> familyColumns = result.getFamilyMap(familyEntry.getKey());
+          for (Map.Entry<byte[], byte[]> column : familyColumns.entrySet()) {
+            txDelete.deleteColumns(familyEntry.getKey(), column.getKey(), transactionTimestamp);
+            addToChangeSet(deleteRow, familyEntry.getKey(), column.getKey());
+          }
+        }
+      }
+    } else {
+      for (Map.Entry<byte [], List<Cell>> familyEntry : familyToDelete.entrySet()) {
+        byte[] family = familyEntry.getKey();
+        List<Cell> entries = familyEntry.getValue();
+        boolean isFamilyDelete = false;
+        if (entries.size() == 1) {
+          Cell cell = entries.get(0);
+          isFamilyDelete = CellUtil.isDeleteFamily(cell);
+        }
+        if (isFamilyDelete) {
+          if (conflictLevel == TxConstants.ConflictDetection.ROW ||
+              conflictLevel == TxConstants.ConflictDetection.NONE) {
+            // no need to identify individual columns deleted
+            txDelete.deleteFamily(family);
+            addToChangeSet(deleteRow, family, null);
+          } else {
+            Result result = get(new Get(delete.getRow()).addFamily(family));
+            // Delete entire family
+            NavigableMap<byte[], byte[]> familyColumns = result.getFamilyMap(family);
+            for (Map.Entry<byte[], byte[]> column : familyColumns.entrySet()) {
+              txDelete.deleteColumns(family, column.getKey(), transactionTimestamp);
+              addToChangeSet(deleteRow, family, column.getKey());
+            }
+          }
+        } else {
+          for (Cell value : entries) {
+            txDelete.deleteColumns(value.getFamily(), value.getQualifier(), transactionTimestamp);
+            addToChangeSet(deleteRow, value.getFamily(), value.getQualifier());
+          }
+        }
+      }
+    }
+    for (Map.Entry<String, byte[]> entry : delete.getAttributesMap().entrySet()) {
+        txDelete.setAttribute(entry.getKey(), entry.getValue());
+    }
+    txDelete.setDurability(delete.getDurability());
+    return txDelete;
+  }
+
+  private List<? extends Row> transactionalizeActions(List<? extends Row> actions) throws IOException {
+    List<Row> transactionalizedActions = new ArrayList<>(actions.size());
+    for (Row action : actions) {
+      if (action instanceof Get) {
+        transactionalizedActions.add(transactionalizeAction((Get) action));
+      } else if (action instanceof Put) {
+        transactionalizedActions.add(transactionalizeAction((Put) action));
+      } else if (action instanceof Delete) {
+        transactionalizedActions.add(transactionalizeAction((Delete) action));
+      } else {
+        transactionalizedActions.add(action);
+      }
+    }
+    return transactionalizedActions;
+  }
+
+  public void addToOperation(OperationWithAttributes op, Transaction tx) throws IOException {
+    op.setAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY, txCodec.encode(tx));
+  }
+
+  protected void makeRollbackOperation(Delete delete) {
+    delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/d00d9f6d/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/CellSkipFilter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/CellSkipFilter.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/CellSkipFilter.java
new file mode 100644
index 0000000..67deb32
--- /dev/null
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/CellSkipFilter.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.coprocessor;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * {@link Filter} that encapsulates another {@link Filter}. It remembers the last {@link KeyValue}
+ * for which the underlying filter returned the {@link ReturnCode#NEXT_COL} or {@link ReturnCode#INCLUDE_AND_NEXT_COL},
+ * so that when {@link #filterKeyValue} is called again for the same {@link KeyValue} with different
+ * version, it returns {@link ReturnCode#NEXT_COL} directly without consulting the underlying {@link Filter}.
+ * Please see TEPHRA-169 for more details.
+ */
+public class CellSkipFilter extends FilterBase {
+  private final Filter filter;
+  // remember the previous keyvalue processed by filter when the return code was NEXT_COL or INCLUDE_AND_NEXT_COL
+  private KeyValue skipColumn = null;
+
+  public CellSkipFilter(Filter filter) {
+    this.filter = filter;
+  }
+
+  /**
+   * Determines whether the current cell should be skipped. The cell will be skipped
+   * if the previous keyvalue had the same key as the current cell. This means filter already responded
+   * for the previous keyvalue with ReturnCode.NEXT_COL or ReturnCode.INCLUDE_AND_NEXT_COL.
+   * @param cell the {@link Cell} to be tested for skipping
+   * @return true is current cell should be skipped, false otherwise
+   */
+  private boolean skipCellVersion(Cell cell) {
+    return skipColumn != null
+      && skipColumn.matchingRow(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())
+      && skipColumn.matchingColumn(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
+                                     cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
+  }
+
+  @Override
+  public ReturnCode filterKeyValue(Cell cell) throws IOException {
+    if (skipCellVersion(cell)) {
+      return ReturnCode.NEXT_COL;
+    }
+
+    ReturnCode code = filter.filterKeyValue(cell);
+    if (code == ReturnCode.NEXT_COL || code == ReturnCode.INCLUDE_AND_NEXT_COL) {
+      // only store the reference to the keyvalue if we are returning NEXT_COL or INCLUDE_AND_NEXT_COL
+      skipColumn = KeyValue.createFirstOnRow(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
+                                               cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
+                                               cell.getQualifierArray(), cell.getQualifierOffset(),
+                                               cell.getQualifierLength());
+    } else {
+      skipColumn = null;
+    }
+    return code;
+  }
+
+  @Override
+  public boolean filterRow() throws IOException {
+    return filter.filterRow();
+  }
+
+  @Override
+  public Cell transformCell(Cell cell) throws IOException {
+    return filter.transformCell(cell);
+  }
+
+  @Override
+  public void reset() throws IOException {
+    filter.reset();
+  }
+
+  @Override
+  public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException {
+    return filter.filterRowKey(buffer, offset, length);
+  }
+
+  @Override
+  public boolean filterAllRemaining() throws IOException {
+    return filter.filterAllRemaining();
+  }
+
+  @Override
+  public void filterRowCells(List<Cell> kvs) throws IOException {
+    filter.filterRowCells(kvs);
+  }
+
+  @Override
+  public boolean hasFilterRow() {
+    return filter.hasFilterRow();
+  }
+
+  @SuppressWarnings("deprecation")
+  @Override
+  public KeyValue getNextKeyHint(KeyValue currentKV) throws IOException {
+    return filter.getNextKeyHint(currentKV);
+  }
+
+  @Override
+  public Cell getNextCellHint(Cell currentKV) throws IOException {
+    return filter.getNextCellHint(currentKV);
+  }
+
+  @Override
+  public boolean isFamilyEssential(byte[] name) throws IOException {
+    return filter.isFamilyEssential(name);
+  }
+
+  @Override
+  public byte[] toByteArray() throws IOException {
+    return filter.toByteArray();
+  }
+}
+