You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/09/08 08:10:03 UTC

[4/7] incubator-tephra git commit: TEPHRA-176, TEPHRA-177: Adding maven modules for CDH-5.7, 5.8 support, HBase-1.1 and HBase-1.2 modules

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2246abff/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
new file mode 100644
index 0000000..4a694eb
--- /dev/null
+++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -0,0 +1,622 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.coprocessor;
+
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MockRegionServerServices;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.tephra.ChangeId;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TransactionType;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.coprocessor.TransactionStateCache;
+import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
+import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.persist.HDFSTransactionStateStorage;
+import org.apache.tephra.persist.TransactionSnapshot;
+import org.apache.tephra.persist.TransactionVisibilityState;
+import org.apache.tephra.snapshot.DefaultSnapshotCodec;
+import org.apache.tephra.snapshot.SnapshotCodecProvider;
+import org.apache.tephra.util.TxUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests filtering of invalid transaction data by the {@link TransactionProcessor} coprocessor.
+ */
+public class TransactionProcessorTest {
+  private static final Logger LOG = LoggerFactory.getLogger(TransactionProcessorTest.class);
+
+  // 8 versions, 1 hour apart, latest is current ts.
+  private static final long[] V;
+
+  static {
+    long now = System.currentTimeMillis();
+    V = new long[9];
+    for (int i = 0; i < V.length; i++) {
+      V[i] = (now - TimeUnit.HOURS.toMillis(8 - i)) * TxConstants.MAX_TX_PER_MS;
+    }
+  }
+
+  @ClassRule
+  public static TemporaryFolder tmpFolder = new TemporaryFolder();
+  private static MiniDFSCluster dfsCluster;
+  private static Configuration conf;
+  private static LongArrayList invalidSet = new LongArrayList(new long[]{V[3], V[5], V[7]});
+  private static TransactionVisibilityState txVisibilityState;
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    Configuration hConf = new Configuration();
+    String rootDir = tmpFolder.newFolder().getAbsolutePath();
+    hConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, rootDir);
+    hConf.set(HConstants.HBASE_DIR, rootDir + "/hbase");
+
+    dfsCluster = new MiniDFSCluster.Builder(hConf).numDataNodes(1).build();
+    dfsCluster.waitActive();
+    conf = HBaseConfiguration.create(dfsCluster.getFileSystem().getConf());
+
+    conf.unset(TxConstants.Manager.CFG_TX_HDFS_USER);
+    conf.unset(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES);
+    String localTestDir = tmpFolder.newFolder().getAbsolutePath();
+    conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, localTestDir);
+    conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName());
+
+    // write an initial transaction snapshot
+    TransactionSnapshot txSnapshot = TransactionSnapshot.copyFrom(
+        System.currentTimeMillis(), V[6] - 1, V[7], invalidSet,
+        // this will set visibility upper bound to V[6]
+        Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx(V[6] - 1, Long.MAX_VALUE,
+                                                                                        TransactionType.SHORT))),
+        new HashMap<Long, Set<ChangeId>>(), new TreeMap<Long, Set<ChangeId>>());
+    txVisibilityState = new TransactionSnapshot(txSnapshot.getTimestamp(), txSnapshot.getReadPointer(),
+                                                txSnapshot.getWritePointer(), txSnapshot.getInvalid(),
+                                                txSnapshot.getInProgress());
+    HDFSTransactionStateStorage tmpStorage =
+      new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf), new TxMetricsCollector());
+    tmpStorage.startAndWait();
+    tmpStorage.writeSnapshot(txSnapshot);
+    tmpStorage.stopAndWait();
+  }
+
+  @AfterClass
+  public static void shutdownAfterClass() throws Exception {
+    dfsCluster.shutdown();
+  }
+
+  @Test
+  public void testDataJanitorRegionScanner() throws Exception {
+    String tableName = "TestRegionScanner";
+    byte[] familyBytes = Bytes.toBytes("f");
+    byte[] columnBytes = Bytes.toBytes("c");
+    HRegion region = createRegion(tableName, familyBytes, TimeUnit.HOURS.toMillis(3));
+    try {
+      region.initialize();
+      TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
+      LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+
+      for (int i = 1; i <= 8; i++) {
+        for (int k = 1; k <= i; k++) {
+          Put p = new Put(Bytes.toBytes(i));
+          p.add(familyBytes, columnBytes, V[k], Bytes.toBytes(V[k]));
+          region.put(p);
+        }
+      }
+
+      List<Cell> results = Lists.newArrayList();
+
+      // force a flush to clear the data
+      // during flush, the coprocessor should drop all KeyValues with timestamps in the invalid set
+
+      LOG.info("Flushing region " + region.getRegionInfo().getRegionNameAsString());
+      region.flushcache(true, false);
+
+      // now a normal scan should only return the valid rows
+      // do not use a filter here to test that cleanup works on flush
+      Scan scan = new Scan();
+      scan.setMaxVersions(10);
+      RegionScanner regionScanner = region.getScanner(scan);
+
+      // first returned value should be "4" with version "4"
+      results.clear();
+      assertTrue(regionScanner.next(results));
+      assertKeyValueMatches(results, 4, new long[]{V[4]});
+
+      results.clear();
+      assertTrue(regionScanner.next(results));
+      assertKeyValueMatches(results, 5, new long[] {V[4]});
+
+      results.clear();
+      assertTrue(regionScanner.next(results));
+      assertKeyValueMatches(results, 6, new long[]{V[6], V[4]});
+
+      results.clear();
+      assertTrue(regionScanner.next(results));
+      assertKeyValueMatches(results, 7, new long[]{V[6], V[4]});
+
+      results.clear();
+      assertFalse(regionScanner.next(results));
+      assertKeyValueMatches(results, 8, new long[] {V[8], V[6], V[4]});
+    } finally {
+      region.close();
+    }
+  }
+
+  @Test
+  public void testDeleteFiltering() throws Exception {
+    String tableName = "TestDeleteFiltering";
+    byte[] familyBytes = Bytes.toBytes("f");
+    byte[] columnBytes = Bytes.toBytes("c");
+    HRegion region = createRegion(tableName, familyBytes, 0);
+    try {
+      region.initialize();
+      TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
+      LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+
+      byte[] row = Bytes.toBytes(1);
+      for (int i = 4; i < V.length; i++) {
+        Put p = new Put(row);
+        p.add(familyBytes, columnBytes, V[i], Bytes.toBytes(V[i]));
+        region.put(p);
+      }
+
+      // delete from the third entry back
+      // take that cell's timestamp + 1 to simulate a delete in a new tx
+      long deleteTs = V[5] + 1;
+      Delete d = new Delete(row, deleteTs);
+      LOG.info("Issuing delete at timestamp " + deleteTs);
+      // row deletes are not yet supported (TransactionAwareHTable normally handles this)
+      d.deleteColumns(familyBytes, columnBytes);
+      region.delete(d);
+
+      List<Cell> results = Lists.newArrayList();
+
+      // force a flush to clear the data
+      // during flush, we should drop the deleted version, but not the others
+      LOG.info("Flushing region " + region.getRegionInfo().getRegionNameAsString());
+      region.flushcache(true, false);
+
+      // now a normal scan should return row with versions at: V[8], V[6].
+      // V[7] is invalid and V[5] and prior are deleted.
+      Scan scan = new Scan();
+      scan.setMaxVersions(10);
+      RegionScanner regionScanner = region.getScanner(scan);
+      // should be only one row
+      assertFalse(regionScanner.next(results));
+      assertKeyValueMatches(results, 1,
+          new long[]{V[8], V[6], deleteTs},
+          new byte[][]{Bytes.toBytes(V[8]), Bytes.toBytes(V[6]), new byte[0]});
+    } finally {
+      region.close();
+    }
+  }
+
+  @Test
+  public void testDeleteMarkerCleanup() throws Exception {
+    String tableName = "TestDeleteMarkerCleanup";
+    byte[] familyBytes = Bytes.toBytes("f");
+    HRegion region = createRegion(tableName, familyBytes, 0);
+    try {
+      region.initialize();
+
+      // all puts use a timestamp before the tx snapshot's visibility upper bound, making them eligible for removal
+      long writeTs = txVisibilityState.getVisibilityUpperBound() - 10;
+      // deletes are performed after the writes, but still before the visibility upper bound
+      long deleteTs = writeTs + 1;
+      // write separate columns to confirm that delete markers survive across flushes
+      byte[] row = Bytes.toBytes(100);
+      Put p = new Put(row);
+
+      LOG.info("Writing columns at timestamp " + writeTs);
+      for (int i = 0; i < 5; i++) {
+        byte[] iBytes = Bytes.toBytes(i);
+        p.add(familyBytes, iBytes, writeTs, iBytes);
+      }
+      region.put(p);
+      // read all back
+      Scan scan = new Scan(row);
+      RegionScanner regionScanner = region.getScanner(scan);
+      List<Cell> results = Lists.newArrayList();
+      assertFalse(regionScanner.next(results));
+      for (int i = 0; i < 5; i++) {
+        Cell cell = results.get(i);
+        assertArrayEquals(row, cell.getRow());
+        byte[] idxBytes = Bytes.toBytes(i);
+        assertArrayEquals(idxBytes, cell.getQualifier());
+        assertArrayEquals(idxBytes, cell.getValue());
+      }
+
+      // force a flush to clear the memstore
+      LOG.info("Before delete, flushing region " + region.getRegionInfo().getRegionNameAsString());
+      region.flushcache(false, false);
+
+      // delete the odd entries
+      for (int i = 0; i < 5; i++) {
+        if (i % 2 == 1) {
+          // deletes are performed as puts with empty values
+          Put deletePut = new Put(row);
+          deletePut.add(familyBytes, Bytes.toBytes(i), deleteTs, new byte[0]);
+          region.put(deletePut);
+        }
+      }
+
+      // read all back
+      scan = new Scan(row);
+      scan.setFilter(TransactionFilters.getVisibilityFilter(TxUtils.createDummyTransaction(txVisibilityState),
+                                                            new TreeMap<byte[], Long>(), false, ScanType.USER_SCAN));
+      regionScanner = region.getScanner(scan);
+      results = Lists.newArrayList();
+      assertFalse(regionScanner.next(results));
+      assertEquals(3, results.size());
+      // only even columns should exist
+      for (int i = 0; i < 3; i++) {
+        Cell cell = results.get(i);
+        LOG.info("Got cell " + cell);
+        assertArrayEquals(row, cell.getRow());
+        byte[] idxBytes = Bytes.toBytes(i * 2);
+        assertArrayEquals(idxBytes, cell.getQualifier());
+        assertArrayEquals(idxBytes, cell.getValue());
+      }
+
+      // force another flush on the delete markers
+      // during flush, we should retain the delete markers, since they can only safely be dropped by a major compaction
+      LOG.info("After delete, flushing region " + region.getRegionInfo().getRegionNameAsString());
+      region.flushcache(true, false);
+
+      scan = new Scan(row);
+      scan.setFilter(TransactionFilters.getVisibilityFilter(TxUtils.createDummyTransaction(txVisibilityState),
+                                                            new TreeMap<byte[], Long>(), false, ScanType.USER_SCAN));
+
+      regionScanner = region.getScanner(scan);
+      results = Lists.newArrayList();
+      assertFalse(regionScanner.next(results));
+      assertEquals(3, results.size());
+      // only even columns should exist
+      for (int i = 0; i < 3; i++) {
+        Cell cell = results.get(i);
+        assertArrayEquals(row, cell.getRow());
+        byte[] idxBytes = Bytes.toBytes(i * 2);
+        assertArrayEquals(idxBytes, cell.getQualifier());
+        assertArrayEquals(idxBytes, cell.getValue());
+      }
+
+      // force a major compaction
+      LOG.info("Forcing major compaction of region " + region.getRegionInfo().getRegionNameAsString());
+      region.compact(true);
+
+      // perform a raw scan (no filter) to confirm that the delete markers are now gone
+      scan = new Scan(row);
+      regionScanner = region.getScanner(scan);
+      results = Lists.newArrayList();
+      assertFalse(regionScanner.next(results));
+      assertEquals(3, results.size());
+      // only even columns should exist
+      for (int i = 0; i < 3; i++) {
+        Cell cell = results.get(i);
+        assertArrayEquals(row, cell.getRow());
+        byte[] idxBytes = Bytes.toBytes(i * 2);
+        assertArrayEquals(idxBytes, cell.getQualifier());
+        assertArrayEquals(idxBytes, cell.getValue());
+      }
+    } finally {
+      region.close();
+    }
+  }
+
+  /**
+   * Test that we correctly preserve the timestamp set for column family delete markers.  This is not
+   * directly required for the TransactionAwareHTable usage, but is the right thing to do and ensures
+   * that we make it easy to interoperate with other systems.
+   */
+  @Test
+  public void testFamilyDeleteTimestamp() throws Exception {
+    String tableName = "TestFamilyDeleteTimestamp";
+    byte[] family1Bytes = Bytes.toBytes("f1");
+    byte[] columnBytes = Bytes.toBytes("c");
+    byte[] rowBytes = Bytes.toBytes("row");
+    byte[] valBytes = Bytes.toBytes("val");
+    HRegion region = createRegion(tableName, family1Bytes, 0);
+    try {
+      region.initialize();
+
+      long now = System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS;
+      Put p = new Put(rowBytes);
+      p.add(family1Bytes, columnBytes, now - 10, valBytes);
+      region.put(p);
+
+      // issue a family delete with an explicit timestamp
+      Delete delete = new Delete(rowBytes, now);
+      delete.deleteFamily(family1Bytes, now - 5);
+      region.delete(delete);
+
+      // test that the delete marker preserved the timestamp
+      Scan scan = new Scan();
+      scan.setMaxVersions();
+      RegionScanner scanner = region.getScanner(scan);
+      List<Cell> results = Lists.newArrayList();
+      scanner.next(results);
+      assertEquals(2, results.size());
+      // delete marker should appear first
+      Cell cell = results.get(0);
+      assertArrayEquals(new byte[0], cell.getQualifier());
+      assertArrayEquals(new byte[0], cell.getValue());
+      assertEquals(now - 5, cell.getTimestamp());
+      // since this is an unfiltered scan against the region, the original put should be next
+      cell = results.get(1);
+      assertArrayEquals(valBytes, cell.getValue());
+      assertEquals(now - 10, cell.getTimestamp());
+      scanner.close();
+
+
+      // with a filtered scan the original put should disappear
+      scan = new Scan();
+      scan.setMaxVersions();
+      scan.setFilter(TransactionFilters.getVisibilityFilter(TxUtils.createDummyTransaction(txVisibilityState),
+                                                            new TreeMap<byte[], Long>(), false, ScanType.USER_SCAN));
+      scanner = region.getScanner(scan);
+      results = Lists.newArrayList();
+      scanner.next(results);
+      assertEquals(0, results.size());
+      scanner.close();
+    } finally {
+      region.close();
+    }
+  }
+
+  @Test
+  public void testPreExistingData() throws Exception {
+    String tableName = "TestPreExistingData";
+    byte[] familyBytes = Bytes.toBytes("f");
+    long ttlMillis = TimeUnit.DAYS.toMillis(14);
+    HRegion region = createRegion(tableName, familyBytes, ttlMillis);
+    try {
+      region.initialize();
+
+      // timestamps for pre-existing, non-transactional data
+      long now = txVisibilityState.getVisibilityUpperBound() / TxConstants.MAX_TX_PER_MS;
+      long older = now - ttlMillis / 2;
+      long newer = now - ttlMillis / 3;
+      // timestamps for transactional data
+      long nowTx = txVisibilityState.getVisibilityUpperBound();
+      long olderTx = nowTx - (ttlMillis / 2) * TxConstants.MAX_TX_PER_MS;
+      long newerTx = nowTx - (ttlMillis / 3) * TxConstants.MAX_TX_PER_MS;
+
+      Map<byte[], Long> ttls = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+      ttls.put(familyBytes, ttlMillis);
+
+      List<Cell> cells = new ArrayList<>();
+      cells.add(new KeyValue(Bytes.toBytes("r1"), familyBytes, Bytes.toBytes("c1"), older, Bytes.toBytes("v11")));
+      cells.add(new KeyValue(Bytes.toBytes("r1"), familyBytes, Bytes.toBytes("c2"), newer, Bytes.toBytes("v12")));
+      cells.add(new KeyValue(Bytes.toBytes("r2"), familyBytes, Bytes.toBytes("c1"), older, Bytes.toBytes("v21")));
+      cells.add(new KeyValue(Bytes.toBytes("r2"), familyBytes, Bytes.toBytes("c2"), newer, Bytes.toBytes("v22")));
+      cells.add(new KeyValue(Bytes.toBytes("r3"), familyBytes, Bytes.toBytes("c1"), olderTx, Bytes.toBytes("v31")));
+      cells.add(new KeyValue(Bytes.toBytes("r3"), familyBytes, Bytes.toBytes("c2"), newerTx, Bytes.toBytes("v32")));
+
+      // Write non-transactional and transactional data
+      for (Cell c : cells) {
+        region.put(new Put(c.getRow()).add(c.getFamily(), c.getQualifier(), c.getTimestamp(), c.getValue()));
+      }
+
+      Scan rawScan = new Scan();
+      rawScan.setMaxVersions();
+
+      Transaction dummyTransaction = TxUtils.createDummyTransaction(txVisibilityState);
+      Scan txScan = new Scan();
+      txScan.setMaxVersions();
+      txScan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttls, dummyTransaction, true),
+                          TxUtils.getMaxVisibleTimestamp(dummyTransaction));
+      txScan.setFilter(TransactionFilters.getVisibilityFilter(dummyTransaction, ttls, false, ScanType.USER_SCAN));
+
+      // read all back with raw scanner
+      scanAndAssert(region, cells, rawScan);
+
+      // read all back with transaction filter
+      scanAndAssert(region, cells, txScan);
+
+      // force a flush to clear the memstore
+      region.flushcache(true, false);
+      scanAndAssert(region, cells, txScan);
+
+      // force a major compaction to remove any expired cells
+      region.compact(true);
+      scanAndAssert(region, cells, txScan);
+
+      // Reduce TTL, this should make cells with timestamps older and olderTx expire
+      long newTtl = ttlMillis / 2 - 1;
+      region = updateTtl(region, familyBytes, newTtl);
+      ttls.put(familyBytes, newTtl);
+      txScan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttls, dummyTransaction, true),
+                          TxUtils.getMaxVisibleTimestamp(dummyTransaction));
+      txScan.setFilter(TransactionFilters.getVisibilityFilter(dummyTransaction, ttls, false, ScanType.USER_SCAN));
+
+      // Raw scan should still give all cells
+      scanAndAssert(region, cells, rawScan);
+      // However, tx scan should not return expired cells
+      scanAndAssert(region, select(cells, 1, 3, 5), txScan);
+
+      region.flushcache(true, false);
+      scanAndAssert(region, cells, rawScan);
+
+      // force a major compaction to remove any expired cells
+      region.compact(true);
+      // This time raw scan too should not return expired cells, as they would be dropped during major compaction
+      scanAndAssert(region, select(cells, 1, 3, 5), rawScan);
+
+      // Reduce TTL again to 1 ms, this should expire all cells
+      newTtl = 1;
+      region = updateTtl(region, familyBytes, newTtl);
+      ttls.put(familyBytes, newTtl);
+      txScan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttls, dummyTransaction, true),
+                          TxUtils.getMaxVisibleTimestamp(dummyTransaction));
+      txScan.setFilter(TransactionFilters.getVisibilityFilter(dummyTransaction, ttls, false, ScanType.USER_SCAN));
+
+      // force a major compaction to remove expired cells
+      region.compact(true);
+      // This time raw scan should not return any cells, as all cells have expired.
+      scanAndAssert(region, Collections.<Cell>emptyList(), rawScan);
+    } finally {
+      region.close();
+    }
+  }
+
+  private List<Cell> select(List<Cell> cells, int... indexes) {
+    List<Cell> newCells = new ArrayList<>();
+    for (int i : indexes) {
+      newCells.add(cells.get(i));
+    }
+    return newCells;
+  }
+
+  @SuppressWarnings("StatementWithEmptyBody")
+  private void scanAndAssert(HRegion region, List<Cell> expected, Scan scan) throws Exception {
+    try (RegionScanner regionScanner = region.getScanner(scan)) {
+      List<Cell> results = Lists.newArrayList();
+      while (regionScanner.next(results)) { }
+      assertEquals(expected, results);
+    }
+  }
+
+  private HRegion updateTtl(HRegion region, byte[] family, long ttl) throws Exception {
+    region.close();
+    HTableDescriptor htd = region.getTableDesc();
+    HColumnDescriptor cfd = htd.getFamily(family);
+    if (ttl > 0) {
+      cfd.setValue(TxConstants.PROPERTY_TTL, String.valueOf(ttl));
+    }
+    cfd.setMaxVersions(10);
+    return HRegion.openHRegion(region.getRegionInfo(), htd, region.getWAL(), conf,
+                               new LocalRegionServerServices(conf, ServerName.valueOf(
+                                 InetAddress.getLocalHost().getHostName(), 0, System.currentTimeMillis())), null);
+  }
+
+  private HRegion createRegion(String tableName, byte[] family, long ttl) throws IOException {
+    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
+    HColumnDescriptor cfd = new HColumnDescriptor(family);
+    if (ttl > 0) {
+      cfd.setValue(TxConstants.PROPERTY_TTL, String.valueOf(ttl));
+    }
+    cfd.setMaxVersions(10);
+    htd.addFamily(cfd);
+    htd.addCoprocessor(TransactionProcessor.class.getName());
+    Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), htd.getTableName());
+    FileSystem fs = FileSystem.get(conf);
+    assertTrue(fs.mkdirs(tablePath));
+    WALFactory walFactory = new WALFactory(conf, null, tableName + ".hlog");
+    WAL hLog = walFactory.getWAL(new byte[]{1});
+    HRegionInfo regionInfo = new HRegionInfo(TableName.valueOf(tableName));
+    HRegionFileSystem regionFS = HRegionFileSystem.createRegionOnFileSystem(conf, fs, tablePath, regionInfo);
+    return new HRegion(regionFS, hLog, conf, htd,
+        new LocalRegionServerServices(conf, ServerName.valueOf(
+            InetAddress.getLocalHost().getHostName(), 0, System.currentTimeMillis())));
+  }
+
+  private void assertKeyValueMatches(List<Cell> results, int index, long[] versions) {
+    byte[][] values = new byte[versions.length][];
+    for (int i = 0; i < versions.length; i++) {
+      values[i] = Bytes.toBytes(versions[i]);
+    }
+    assertKeyValueMatches(results, index, versions, values);
+  }
+
+  private void assertKeyValueMatches(List<Cell> results, int index, long[] versions, byte[][] values) {
+    assertEquals(versions.length, results.size());
+    assertEquals(values.length, results.size());
+    for (int i = 0; i < versions.length; i++) {
+      Cell kv = results.get(i);
+      assertArrayEquals(Bytes.toBytes(index), kv.getRow());
+      assertEquals(versions[i], kv.getTimestamp());
+      assertArrayEquals(values[i], kv.getValue());
+    }
+  }
+
+  @Test
+  public void testTransactionStateCache() throws Exception {
+    TransactionStateCache cache = new TransactionStateCache();
+    cache.setConf(conf);
+    cache.startAndWait();
+    // verify that the transaction snapshot read matches what we wrote in setupBeforeClass()
+    TransactionVisibilityState cachedSnapshot = cache.getLatestState();
+    assertNotNull(cachedSnapshot);
+    assertEquals(invalidSet, cachedSnapshot.getInvalid());
+    cache.stopAndWait();
+  }
+
+  private static class LocalRegionServerServices extends MockRegionServerServices {
+    private final ServerName serverName;
+
+    public LocalRegionServerServices(Configuration conf, ServerName serverName) {
+      super(conf);
+      this.serverName = serverName;
+    }
+
+    @Override
+    public ServerName getServerName() {
+      return serverName;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2246abff/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
new file mode 100644
index 0000000..d976085
--- /dev/null
+++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.coprocessor;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.hbase.AbstractTransactionVisibilityFilterTest;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * HBase 1.1 specific test for filtering logic applied when reading data transactionally.
+ */
+public class TransactionVisibilityFilterTest extends AbstractTransactionVisibilityFilterTest {
+  /**
+   * Test filtering of KeyValues for in-progress and invalid transactions.
+   * @throws Exception
+   */
+  @Test
+  public void testFiltering() throws Exception {
+    TxFilterFactory txFilterFactory = new TxFilterFactory() {
+      @Override
+      public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
+        return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN);
+      }
+    };
+    runFilteringTest(txFilterFactory,
+                     ImmutableList.of(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+                                      Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+                                      Filter.ReturnCode.SKIP,
+                                      Filter.ReturnCode.SKIP,
+                                      Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+                                      Filter.ReturnCode.INCLUDE_AND_NEXT_COL));
+  }
+
+  @Test
+  public void testSubFilter() throws Exception {
+    final FilterBase includeFilter = new FilterBase() {
+      @Override
+      public ReturnCode filterKeyValue(Cell ignored) throws IOException {
+        return ReturnCode.INCLUDE;
+      }
+    };
+    TxFilterFactory txFilterFactory = new TxFilterFactory() {
+      @Override
+      public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
+        return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeFilter);
+      }
+    };
+    runFilteringTest(txFilterFactory,
+                     ImmutableList.of(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+                                      Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+                                      Filter.ReturnCode.SKIP,
+                                      Filter.ReturnCode.SKIP,
+                                      Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+                                      Filter.ReturnCode.INCLUDE_AND_NEXT_COL));
+
+    final Filter skipFilter = new FilterBase() {
+      @Override
+      public ReturnCode filterKeyValue(Cell ignored) throws IOException {
+        return ReturnCode.SKIP;
+      }
+    };
+    txFilterFactory = new TxFilterFactory() {
+      @Override
+      public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
+        return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, skipFilter);
+      }
+    };
+    runFilteringTest(txFilterFactory,
+                     ImmutableList.of(Filter.ReturnCode.NEXT_COL,
+                                      Filter.ReturnCode.NEXT_COL,
+                                      Filter.ReturnCode.SKIP,
+                                      Filter.ReturnCode.SKIP,
+                                      Filter.ReturnCode.NEXT_COL,
+                                      Filter.ReturnCode.NEXT_COL));
+
+    final Filter includeNextFilter = new FilterBase() {
+      @Override
+      public ReturnCode filterKeyValue(Cell ignored) throws IOException {
+        return ReturnCode.INCLUDE_AND_NEXT_COL;
+      }
+    };
+    txFilterFactory = new TxFilterFactory() {
+      @Override
+      public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
+        return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeNextFilter);
+      }
+    };
+    runFilteringTest(txFilterFactory,
+                     ImmutableList.of(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+                                      Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+                                      Filter.ReturnCode.SKIP,
+                                      Filter.ReturnCode.SKIP,
+                                      Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+                                      Filter.ReturnCode.INCLUDE_AND_NEXT_COL));
+
+    final Filter nextColFilter = new FilterBase() {
+      @Override
+      public ReturnCode filterKeyValue(Cell ignored) throws IOException {
+        return ReturnCode.NEXT_COL;
+      }
+    };
+    txFilterFactory = new TxFilterFactory() {
+      @Override
+      public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
+        return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, nextColFilter);
+      }
+    };
+    runFilteringTest(txFilterFactory,
+                     ImmutableList.of(Filter.ReturnCode.NEXT_COL,
+                                      Filter.ReturnCode.NEXT_COL,
+                                      Filter.ReturnCode.SKIP,
+                                      Filter.ReturnCode.SKIP,
+                                      Filter.ReturnCode.NEXT_COL,
+                                      Filter.ReturnCode.NEXT_COL));
+
+  }
+
+  @Test
+  public void testSubFilterOverride() throws Exception {
+    final FilterBase includeFilter = new FilterBase() {
+      @Override
+      public ReturnCode filterKeyValue(Cell ignored) throws IOException {
+        return ReturnCode.INCLUDE;
+      }
+    };
+    TxFilterFactory txFilterFactory = new TxFilterFactory() {
+      @Override
+      public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
+        return new CustomTxFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeFilter);
+      }
+    };
+    runFilteringTest(txFilterFactory,
+                     ImmutableList.of(Filter.ReturnCode.INCLUDE,
+                                      Filter.ReturnCode.INCLUDE,
+                                      Filter.ReturnCode.SKIP,
+                                      Filter.ReturnCode.SKIP,
+                                      Filter.ReturnCode.INCLUDE,
+                                      Filter.ReturnCode.INCLUDE));
+
+    final Filter skipFilter = new FilterBase() {
+      @Override
+      public ReturnCode filterKeyValue(Cell ignored) throws IOException {
+        return ReturnCode.SKIP;
+      }
+    };
+    txFilterFactory = new TxFilterFactory() {
+      @Override
+      public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
+        return new CustomTxFilter(tx, familyTTLs, false, ScanType.USER_SCAN, skipFilter);
+      }
+    };
+    runFilteringTest(txFilterFactory,
+                     ImmutableList.of(Filter.ReturnCode.NEXT_COL,
+                                      Filter.ReturnCode.NEXT_COL,
+                                      Filter.ReturnCode.SKIP,
+                                      Filter.ReturnCode.SKIP,
+                                      Filter.ReturnCode.NEXT_COL,
+                                      Filter.ReturnCode.NEXT_COL));
+
+    final Filter includeNextFilter = new FilterBase() {
+      @Override
+      public ReturnCode filterKeyValue(Cell ignored) throws IOException {
+        return ReturnCode.INCLUDE_AND_NEXT_COL;
+      }
+    };
+    txFilterFactory = new TxFilterFactory() {
+      @Override
+      public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
+        return new CustomTxFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeNextFilter);
+      }
+    };
+    runFilteringTest(txFilterFactory,
+                     ImmutableList.of(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+                                      Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+                                      Filter.ReturnCode.SKIP,
+                                      Filter.ReturnCode.SKIP,
+                                      Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+                                      Filter.ReturnCode.INCLUDE_AND_NEXT_COL));
+
+    final Filter nextColFilter = new FilterBase() {
+      @Override
+      public ReturnCode filterKeyValue(Cell ignored) throws IOException {
+        return ReturnCode.NEXT_COL;
+      }
+    };
+    txFilterFactory = new TxFilterFactory() {
+      @Override
+      public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
+        return new CustomTxFilter(tx, familyTTLs, false, ScanType.USER_SCAN, nextColFilter);
+      }
+    };
+    runFilteringTest(txFilterFactory,
+                     ImmutableList.of(Filter.ReturnCode.NEXT_COL,
+                                      Filter.ReturnCode.NEXT_COL,
+                                      Filter.ReturnCode.SKIP,
+                                      Filter.ReturnCode.SKIP,
+                                      Filter.ReturnCode.NEXT_COL,
+                                      Filter.ReturnCode.NEXT_COL));
+
+  }
+
+  private void runFilteringTest(TxFilterFactory txFilterFactory,
+                                List<Filter.ReturnCode> assertCodes) throws Exception {
+
+    /*
+     * Start and stop some transactions.  This will give us a transaction state something like the following
+     * (numbers only reflect ordering, not actual transaction IDs):
+     *   6  - in progress
+     *   5  - committed
+     *   4  - invalid
+     *   3  - in-progress
+     *   2  - committed
+     *   1  - committed
+     *
+     *   read ptr = 5
+     *   write ptr = 6
+     */
+
+    Transaction tx1 = txManager.startShort();
+    assertTrue(txManager.canCommit(tx1, EMPTY_CHANGESET));
+    assertTrue(txManager.commit(tx1));
+
+    Transaction tx2 = txManager.startShort();
+    assertTrue(txManager.canCommit(tx2, EMPTY_CHANGESET));
+    assertTrue(txManager.commit(tx2));
+
+    Transaction tx3 = txManager.startShort();
+    Transaction tx4 = txManager.startShort();
+    txManager.invalidate(tx4.getTransactionId());
+
+    Transaction tx5 = txManager.startShort();
+    assertTrue(txManager.canCommit(tx5, EMPTY_CHANGESET));
+    assertTrue(txManager.commit(tx5));
+
+    Transaction tx6 = txManager.startShort();
+
+    Map<byte[], Long> ttls = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+    Filter filter = txFilterFactory.getTxFilter(tx6, ttls);
+
+    assertEquals(assertCodes.get(5),
+                 filter.filterKeyValue(newKeyValue("row1", "val1", tx6.getTransactionId())));
+    assertEquals(assertCodes.get(4),
+                 filter.filterKeyValue(newKeyValue("row1", "val1", tx5.getTransactionId())));
+    assertEquals(assertCodes.get(3),
+                 filter.filterKeyValue(newKeyValue("row1", "val1", tx4.getTransactionId())));
+    assertEquals(assertCodes.get(2),
+                 filter.filterKeyValue(newKeyValue("row1", "val1", tx3.getTransactionId())));
+    assertEquals(assertCodes.get(1),
+                 filter.filterKeyValue(newKeyValue("row1", "val1", tx2.getTransactionId())));
+    assertEquals(assertCodes.get(0),
+                 filter.filterKeyValue(newKeyValue("row1", "val1", tx1.getTransactionId())));
+  }
+
+  /**
+   * Test filtering for TTL settings.
+   * @throws Exception
+   */
+  @Test
+  public void testTTLFiltering() throws Exception {
+    Map<byte[], Long> ttls = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+    ttls.put(FAM, 10L);
+    ttls.put(FAM2, 30L);
+    ttls.put(FAM3, 0L);
+
+    Transaction tx = txManager.startShort();
+    long now = tx.getVisibilityUpperBound();
+    Filter filter = new TransactionVisibilityFilter(tx, ttls, false, ScanType.USER_SCAN);
+    assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+                 filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now)));
+    assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+                 filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now - 1 * TxConstants.MAX_TX_PER_MS)));
+    assertEquals(Filter.ReturnCode.NEXT_COL,
+                 filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now - 11 * TxConstants.MAX_TX_PER_MS)));
+    assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+                 filter.filterKeyValue(newKeyValue("row1", FAM2, "val1", now - 11 * TxConstants.MAX_TX_PER_MS)));
+    assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+                 filter.filterKeyValue(newKeyValue("row1", FAM2, "val1", now - 21 * TxConstants.MAX_TX_PER_MS)));
+    assertEquals(Filter.ReturnCode.NEXT_COL,
+                 filter.filterKeyValue(newKeyValue("row1", FAM2, "val1", now - 31 * TxConstants.MAX_TX_PER_MS)));
+    assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+                 filter.filterKeyValue(newKeyValue("row1", FAM3, "val1", now - 31 * TxConstants.MAX_TX_PER_MS)));
+    assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+                 filter.filterKeyValue(newKeyValue("row1", FAM3, "val1", now - 1001 * TxConstants.MAX_TX_PER_MS)));
+    assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+                 filter.filterKeyValue(newKeyValue("row2", FAM, "val1", now)));
+    assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+                 filter.filterKeyValue(newKeyValue("row2", FAM, "val1", now - 1 * TxConstants.MAX_TX_PER_MS)));
+
+    // Verify ttl for pre-existing, non-transactional data
+    long preNow = now / TxConstants.MAX_TX_PER_MS;
+    assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+                 filter.filterKeyValue(newKeyValue("row1", FAM, "val1", preNow)));
+    assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+                 filter.filterKeyValue(newKeyValue("row1", FAM, "val1", preNow - 9L)));
+    assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+                 filter.filterKeyValue(newKeyValue("row1", FAM, "val1", preNow - 10L)));
+    assertEquals(Filter.ReturnCode.NEXT_COL,
+                 filter.filterKeyValue(newKeyValue("row1", FAM, "val1", preNow - 11L)));
+    assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+                 filter.filterKeyValue(newKeyValue("row1", FAM3, "val1", preNow)));
+    assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+                 filter.filterKeyValue(newKeyValue("row1", FAM3, "val1", preNow - 9L)));
+    assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+                 filter.filterKeyValue(newKeyValue("row1", FAM3, "val1", preNow - 10L)));
+    assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+                 filter.filterKeyValue(newKeyValue("row1", FAM3, "val1", preNow - 1001L)));
+  }
+
+  protected KeyValue newKeyValue(String rowkey, String value, long timestamp) {
+    return new KeyValue(Bytes.toBytes(rowkey), FAM, COL, timestamp, Bytes.toBytes(value));
+  }
+
+  protected KeyValue newKeyValue(String rowkey, byte[] family, String value, long timestamp) {
+    return new KeyValue(Bytes.toBytes(rowkey), family, COL, timestamp, Bytes.toBytes(value));
+  }
+
+  private interface TxFilterFactory {
+    Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs);
+  }
+
+  private class CustomTxFilter extends TransactionVisibilityFilter {
+    public CustomTxFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues, ScanType scanType,
+                          @Nullable Filter cellFilter) {
+      super(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter);
+    }
+
+    @Override
+    protected ReturnCode determineReturnCode(ReturnCode txFilterCode, ReturnCode subFilterCode) {
+      switch (subFilterCode) {
+        case INCLUDE:
+          return ReturnCode.INCLUDE;
+        case INCLUDE_AND_NEXT_COL:
+          return ReturnCode.INCLUDE_AND_NEXT_COL;
+        case SKIP:
+          return txFilterCode == ReturnCode.INCLUDE ? ReturnCode.SKIP : ReturnCode.NEXT_COL;
+        default:
+          return subFilterCode;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2246abff/tephra-hbase-compat-1.1-base/tephra-hbase-compat-1.1/pom.xml
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/tephra-hbase-compat-1.1/pom.xml b/tephra-hbase-compat-1.1-base/tephra-hbase-compat-1.1/pom.xml
new file mode 100644
index 0000000..537e237
--- /dev/null
+++ b/tephra-hbase-compat-1.1-base/tephra-hbase-compat-1.1/pom.xml
@@ -0,0 +1,117 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <groupId>org.apache.tephra</groupId>
+    <artifactId>tephra-hbase-compat-1.1-base</artifactId>
+    <version>0.9.0-incubating-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>tephra-hbase-compat-1.1</artifactId>
+  <name>Apache Tephra HBase 1.1 Compatibility</name>
+
+  <properties>
+    <hadoop.version>2.5.1</hadoop.version>
+    <hbase11.version>1.1.1</hbase11.version>
+  </properties>
+
+  <build>
+    <sourceDirectory>../src/main/java</sourceDirectory>
+    <testSourceDirectory>../src/test/java</testSourceDirectory>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-common</artifactId>
+      <version>${hbase11.version}</version>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-client</artifactId>
+      <version>${hbase11.version}</version>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-protocol</artifactId>
+      <version>${hbase11.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-server</artifactId>
+      <version>${hbase11.version}</version>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <!-- Test dependencies -->
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-server</artifactId>
+      <version>${hbase11.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-testing-util</artifactId>
+      <version>${hbase11.version}</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>asm</groupId>
+          <artifactId>asm</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2246abff/tephra-hbase-compat-1.1-base/tephra-hbase-compat-1.2-cdh/pom.xml
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/tephra-hbase-compat-1.2-cdh/pom.xml b/tephra-hbase-compat-1.1-base/tephra-hbase-compat-1.2-cdh/pom.xml
new file mode 100644
index 0000000..ad84a05
--- /dev/null
+++ b/tephra-hbase-compat-1.1-base/tephra-hbase-compat-1.2-cdh/pom.xml
@@ -0,0 +1,125 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <groupId>org.apache.tephra</groupId>
+    <artifactId>tephra-hbase-compat-1.1-base</artifactId>
+    <version>0.9.0-incubating-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>tephra-hbase-compat-1.2-cdh</artifactId>
+  <name>Apache Tephra HBase 1.2 Compatibility for CDH</name>
+
+  <properties>
+    <!-- Hadoop version supported by CDH -->
+    <hadoop.version>2.6.0-cdh5.7.0</hadoop.version>
+    <hbase12cdh.version>1.2.0-cdh5.7.0</hbase12cdh.version>
+  </properties>
+
+  <repositories>
+    <repository>
+      <id>cloudera</id>
+      <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
+    </repository>
+  </repositories>
+
+  <build>
+    <sourceDirectory>../src/main/java</sourceDirectory>
+    <testSourceDirectory>../src/test/java</testSourceDirectory>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-common</artifactId>
+      <version>${hbase12cdh.version}</version>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-client</artifactId>
+      <version>${hbase12cdh.version}</version>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-protocol</artifactId>
+      <version>${hbase12cdh.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-server</artifactId>
+      <version>${hbase12cdh.version}</version>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <!-- Test dependencies -->
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-server</artifactId>
+      <version>${hbase12cdh.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-testing-util</artifactId>
+      <version>${hbase12cdh.version}</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>asm</groupId>
+          <artifactId>asm</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2246abff/tephra-hbase-compat-1.1-base/tephra-hbase-compat-1.2/pom.xml
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/tephra-hbase-compat-1.2/pom.xml b/tephra-hbase-compat-1.1-base/tephra-hbase-compat-1.2/pom.xml
new file mode 100644
index 0000000..18b26dd
--- /dev/null
+++ b/tephra-hbase-compat-1.1-base/tephra-hbase-compat-1.2/pom.xml
@@ -0,0 +1,118 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <groupId>org.apache.tephra</groupId>
+    <artifactId>tephra-hbase-compat-1.1-base</artifactId>
+    <version>0.9.0-incubating-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>tephra-hbase-compat-1.2</artifactId>
+  <name>Apache Tephra HBase 1.2 Compatibility</name>
+
+  <properties>
+    <hadoop.version>2.5.1</hadoop.version>
+    <hbase12.version>1.2.0</hbase12.version>
+
+  </properties>
+
+  <build>
+    <sourceDirectory>../src/main/java</sourceDirectory>
+    <testSourceDirectory>../src/test/java</testSourceDirectory>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-common</artifactId>
+      <version>${hbase12.version}</version>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-client</artifactId>
+      <version>${hbase12.version}</version>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-protocol</artifactId>
+      <version>${hbase12.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-server</artifactId>
+      <version>${hbase12.version}</version>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <!-- Test dependencies -->
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-server</artifactId>
+      <version>${hbase12.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-testing-util</artifactId>
+      <version>${hbase12.version}</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>asm</groupId>
+          <artifactId>asm</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2246abff/tephra-hbase-compat-1.1/pom.xml
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1/pom.xml b/tephra-hbase-compat-1.1/pom.xml
deleted file mode 100644
index 543ee7f..0000000
--- a/tephra-hbase-compat-1.1/pom.xml
+++ /dev/null
@@ -1,150 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~      http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <parent>
-    <groupId>org.apache.tephra</groupId>
-    <artifactId>tephra</artifactId>
-    <version>0.9.0-incubating-SNAPSHOT</version>
-  </parent>
-  <modelVersion>4.0.0</modelVersion>
-
-  <artifactId>tephra-hbase-compat-1.1</artifactId>
-  <name>Apache Tephra HBase 1.1 Compatibility</name>
-
-  <properties>
-    <!-- HBase 1.1 only supports Hadoop 2.4 or newer -->
-    <hadoop.version>2.4.0</hadoop.version>
-  </properties>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.tephra</groupId>
-      <artifactId>tephra-api</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.tephra</groupId>
-      <artifactId>tephra-core</artifactId>
-      <version>${project.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.hbase</groupId>
-          <artifactId>hbase</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hbase</groupId>
-      <artifactId>hbase-common</artifactId>
-      <version>${hbase11.version}</version>
-      <scope>provided</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>org.slf4j</groupId>
-          <artifactId>slf4j-log4j12</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hbase</groupId>
-      <artifactId>hbase-client</artifactId>
-      <version>${hbase11.version}</version>
-      <scope>provided</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>org.slf4j</groupId>
-          <artifactId>slf4j-log4j12</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hbase</groupId>
-      <artifactId>hbase-protocol</artifactId>
-      <version>${hbase11.version}</version>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hbase</groupId>
-      <artifactId>hbase-server</artifactId>
-      <version>${hbase11.version}</version>
-      <scope>provided</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>org.slf4j</groupId>
-          <artifactId>slf4j-log4j12</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <!-- Test dependencies -->
-    <dependency>
-      <groupId>org.apache.tephra</groupId>
-      <artifactId>tephra-core</artifactId>
-      <version>${project.version}</version>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>log4j-over-slf4j</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>jcl-over-slf4j</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hbase</groupId>
-      <artifactId>hbase-server</artifactId>
-      <version>${hbase11.version}</version>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hbase</groupId>
-      <artifactId>hbase-testing-util</artifactId>
-      <version>${hbase11.version}</version>
-      <scope>test</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>asm</groupId>
-          <artifactId>asm</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.slf4j</groupId>
-          <artifactId>slf4j-log4j12</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-  </dependencies>
-
-  <profiles>
-    <profile>
-      <id>hbase1.2</id>
-      <properties>
-        <hbase11.version>1.2.1</hbase11.version>
-      </properties>
-    </profile>
-  </profiles>
-
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2246abff/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase/HBase11ConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase/HBase11ConfigurationProvider.java b/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase/HBase11ConfigurationProvider.java
deleted file mode 100644
index 7ab7a18..0000000
--- a/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase/HBase11ConfigurationProvider.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tephra.hbase;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.tephra.util.ConfigurationProvider;
-
-/**
- * HBase 1.1 version of {@link ConfigurationProvider}.
- */
-public class HBase11ConfigurationProvider extends ConfigurationProvider {
-  @Override
-  public Configuration get() {
-    return HBaseConfiguration.create();
-  }
-
-  @Override
-  public Configuration get(Configuration baseConf) {
-    return HBaseConfiguration.create(baseConf);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2246abff/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase/SecondaryIndexTable.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase/SecondaryIndexTable.java b/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase/SecondaryIndexTable.java
deleted file mode 100644
index 8bf8768..0000000
--- a/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase/SecondaryIndexTable.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tephra.hbase;
-
-import com.google.common.base.Throwables;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.tephra.TransactionContext;
-import org.apache.tephra.TransactionFailureException;
-import org.apache.tephra.distributed.TransactionServiceClient;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * A Transactional SecondaryIndexTable.
- */
-public class SecondaryIndexTable implements Closeable {
-  private byte[] secondaryIndex;
-  private TransactionAwareHTable transactionAwareHTable;
-  private TransactionAwareHTable secondaryIndexTable;
-  private TransactionContext transactionContext;
-  private final TableName secondaryIndexTableName;
-  private static final byte[] secondaryIndexFamily = Bytes.toBytes("secondaryIndexFamily");
-  private static final byte[] secondaryIndexQualifier = Bytes.toBytes('r');
-  private static final byte[] DELIMITER  = new byte[] {0};
-
-  public SecondaryIndexTable(TransactionServiceClient transactionServiceClient, HTableInterface hTable,
-                             byte[] secondaryIndex) {
-    secondaryIndexTableName = TableName.valueOf(hTable.getName().getNameAsString() + ".idx");
-    HTable secondaryIndexHTable = null;
-    try (HBaseAdmin hBaseAdmin = new HBaseAdmin(hTable.getConfiguration())) {
-      if (!hBaseAdmin.tableExists(secondaryIndexTableName)) {
-        hBaseAdmin.createTable(new HTableDescriptor(secondaryIndexTableName));
-      }
-      secondaryIndexHTable = new HTable(hTable.getConfiguration(), secondaryIndexTableName);
-    } catch (Exception e) {
-      Throwables.propagate(e);
-    }
-
-    this.secondaryIndex = secondaryIndex;
-    this.transactionAwareHTable = new TransactionAwareHTable(hTable);
-    this.secondaryIndexTable = new TransactionAwareHTable(secondaryIndexHTable);
-    this.transactionContext = new TransactionContext(transactionServiceClient, transactionAwareHTable,
-                                                     secondaryIndexTable);
-  }
-
-  public Result get(Get get) throws IOException {
-    return get(Collections.singletonList(get))[0];
-  }
-
-  public Result[] get(List<Get> gets) throws IOException {
-    try {
-      transactionContext.start();
-      Result[] result = transactionAwareHTable.get(gets);
-      transactionContext.finish();
-      return result;
-    } catch (Exception e) {
-      try {
-        transactionContext.abort();
-      } catch (TransactionFailureException e1) {
-        throw new IOException("Could not rollback transaction", e1);
-      }
-    }
-    return null;
-  }
-
-  public Result[] getByIndex(byte[] value) throws IOException {
-    try {
-      transactionContext.start();
-      Scan scan = new Scan(value, Bytes.add(value, new byte[0]));
-      scan.addColumn(secondaryIndexFamily, secondaryIndexQualifier);
-      ResultScanner indexScanner = secondaryIndexTable.getScanner(scan);
-
-      ArrayList<Get> gets = new ArrayList<>();
-      for (Result result : indexScanner) {
-        for (Cell cell : result.listCells()) {
-          gets.add(new Get(cell.getValue()));
-        }
-      }
-      Result[] results = transactionAwareHTable.get(gets);
-      transactionContext.finish();
-      return results;
-    } catch (Exception e) {
-      try {
-        transactionContext.abort();
-      } catch (TransactionFailureException e1) {
-        throw new IOException("Could not rollback transaction", e1);
-      }
-    }
-    return null;
-  }
-
-  public void put(Put put) throws IOException {
-    put(Collections.singletonList(put));
-  }
-
-
-  public void put(List<Put> puts) throws IOException {
-    try {
-      transactionContext.start();
-      ArrayList<Put> secondaryIndexPuts = new ArrayList<>();
-      for (Put put : puts) {
-        List<Put> indexPuts = new ArrayList<>();
-        Set<Map.Entry<byte[], List<KeyValue>>> familyMap = put.getFamilyMap().entrySet();
-        for (Map.Entry<byte [], List<KeyValue>> family : familyMap) {
-          for (KeyValue value : family.getValue()) {
-            if (Bytes.equals(value.getQualifierArray(), value.getQualifierOffset(), value.getQualifierLength(),
-                             secondaryIndex, 0, secondaryIndex.length)) {
-              byte[] secondaryRow = Bytes.add(value.getQualifier(), DELIMITER,
-                                                    Bytes.add(value.getValue(), DELIMITER,
-                                                              value.getRow()));
-              Put indexPut = new Put(secondaryRow);
-              indexPut.add(secondaryIndexFamily, secondaryIndexQualifier, put.getRow());
-              indexPuts.add(indexPut);
-            }
-          }
-        }
-        secondaryIndexPuts.addAll(indexPuts);
-      }
-      transactionAwareHTable.put(puts);
-      secondaryIndexTable.put(secondaryIndexPuts);
-      transactionContext.finish();
-    } catch (Exception e) {
-      try {
-        transactionContext.abort();
-      } catch (TransactionFailureException e1) {
-        throw new IOException("Could not rollback transaction", e1);
-      }
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    try {
-      transactionAwareHTable.close();
-    } catch (IOException e) {
-      try {
-        secondaryIndexTable.close();
-      } catch (IOException ex) {
-        e.addSuppressed(e);
-      }
-      throw e;
-    }
-    secondaryIndexTable.close();
-  }
-}