You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2018/03/10 01:56:31 UTC
[2/6] incubator-tephra git commit: TEPHRA-272 Add HBase 2.0
compatibility module
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/db6ef6d2/tephra-hbase-compat-2.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-2.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-2.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
new file mode 100644
index 0000000..e909d73
--- /dev/null
+++ b/tephra-hbase-compat-2.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -0,0 +1,676 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.coprocessor;
+
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.ChoreService;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MockRegionServerServices;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
+import org.apache.hadoop.hbase.regionserver.ChunkCreator;
+import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
+import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
+import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.coprocessor.TransactionStateCache;
+import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
+import org.apache.tephra.manager.InvalidTxList;
+import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.persist.HDFSTransactionStateStorage;
+import org.apache.tephra.persist.TransactionSnapshot;
+import org.apache.tephra.persist.TransactionVisibilityState;
+import org.apache.tephra.snapshot.DefaultSnapshotCodec;
+import org.apache.tephra.snapshot.SnapshotCodecProvider;
+import org.apache.tephra.util.TxUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests filtering of invalid transaction data by the {@link TransactionProcessor} coprocessor.
+ */
+public class TransactionProcessorTest {
+ private static final Logger LOG = LoggerFactory.getLogger(TransactionProcessorTest.class);
+ protected static ChunkCreator chunkCreator;
+ // 8 versions, 1 hour apart, latest is current ts.
+ private static final long[] V;
+
+ static {
+ long now = System.currentTimeMillis();
+ V = new long[9];
+ for (int i = 0; i < V.length; i++) {
+ V[i] = (now - TimeUnit.HOURS.toMillis(8 - i)) * TxConstants.MAX_TX_PER_MS;
+ }
+ }
+
+ @ClassRule
+ public static TemporaryFolder tmpFolder = new TemporaryFolder();
+ private static MiniDFSCluster dfsCluster;
+ private static Configuration conf;
+ private static LongArrayList invalidSet = new LongArrayList(new long[]{V[3], V[5], V[7]});
+ private static TransactionVisibilityState txVisibilityState;
+
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ Configuration hConf = new Configuration();
+ String rootDir = tmpFolder.newFolder().getAbsolutePath();
+ hConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, rootDir);
+ hConf.set(HConstants.HBASE_DIR, rootDir + "/hbase");
+
+ dfsCluster = new MiniDFSCluster.Builder(hConf).numDataNodes(1).build();
+ dfsCluster.waitActive();
+ conf = HBaseConfiguration.create(dfsCluster.getFileSystem().getConf());
+
+ conf.unset(TxConstants.Manager.CFG_TX_HDFS_USER);
+ conf.unset(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES);
+ String localTestDir = tmpFolder.newFolder().getAbsolutePath();
+ conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, localTestDir);
+ conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName());
+
+ // write an initial transaction snapshot
+ InvalidTxList invalidTxList = new InvalidTxList();
+ invalidTxList.addAll(invalidSet);
+ TransactionSnapshot txSnapshot = TransactionSnapshot.copyFrom(
+ System.currentTimeMillis(), V[6] - 1, V[7], invalidTxList,
+ // this will set visibility upper bound to V[6]
+ Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx(
+ V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))),
+ new HashMap<Long, TransactionManager.ChangeSet>(), new TreeMap<Long, TransactionManager.ChangeSet>());
+ txVisibilityState = new TransactionSnapshot(txSnapshot.getTimestamp(), txSnapshot.getReadPointer(),
+ txSnapshot.getWritePointer(), txSnapshot.getInvalid(),
+ txSnapshot.getInProgress());
+ HDFSTransactionStateStorage tmpStorage =
+ new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf), new TxMetricsCollector());
+ tmpStorage.startAndWait();
+ tmpStorage.writeSnapshot(txSnapshot);
+ tmpStorage.stopAndWait();
+ long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage()
+ .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
+ chunkCreator = ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false,
+ globalMemStoreLimit, 0.2f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null);
+ assertTrue(chunkCreator != null);
+ }
+
+ @AfterClass
+ public static void shutdownAfterClass() throws Exception {
+ dfsCluster.shutdown();
+ }
+
+ @Test
+ public void testDataJanitorRegionScanner() throws Exception {
+ String tableName = "TestRegionScanner";
+ byte[] familyBytes = Bytes.toBytes("f");
+ byte[] columnBytes = Bytes.toBytes("c");
+ HRegion region = createRegion(tableName, familyBytes, TimeUnit.HOURS.toMillis(3));
+ try {
+ region.initialize();
+ TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
+ LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
+
+ for (int i = 1; i <= 8; i++) {
+ for (int k = 1; k <= i; k++) {
+ Put p = new Put(Bytes.toBytes(i));
+ p.addColumn(familyBytes, columnBytes, V[k], Bytes.toBytes(V[k]));
+ region.put(p);
+ }
+ }
+
+ List<Cell> results = Lists.newArrayList();
+
+ // force a flush to clear the data
+ // during flush, the coprocessor should drop all KeyValues with timestamps in the invalid set
+
+ LOG.info("Flushing region " + region.getRegionInfo().getRegionNameAsString());
+ FlushResultImpl flushResult = region.flushcache(true, false, new FlushLifeCycleTracker() { });
+ Assert.assertTrue("Unexpected flush result: " + flushResult, flushResult.isFlushSucceeded());
+
+ // now a normal scan should only return the valid rows
+ // do not use a filter here to test that cleanup works on flush
+ Scan scan = new Scan();
+ scan.setMaxVersions(10);
+ RegionScanner regionScanner = region.getScanner(scan);
+
+ // first returned value should be "4" with version "4"
+ results.clear();
+ assertTrue(regionScanner.next(results));
+ assertKeyValueMatches(results, 4, new long[]{V[4]});
+
+ results.clear();
+ assertTrue(regionScanner.next(results));
+ assertKeyValueMatches(results, 5, new long[] {V[4]});
+
+ results.clear();
+ assertTrue(regionScanner.next(results));
+ assertKeyValueMatches(results, 6, new long[]{V[6], V[4]});
+
+ results.clear();
+ assertTrue(regionScanner.next(results));
+ assertKeyValueMatches(results, 7, new long[]{V[6], V[4]});
+
+ results.clear();
+ assertFalse(regionScanner.next(results));
+ assertKeyValueMatches(results, 8, new long[] {V[8], V[6], V[4]});
+ } finally {
+ region.close();
+ }
+ }
+
+ @Test
+ public void testDeleteFiltering() throws Exception {
+ String tableName = "TestDeleteFiltering";
+ byte[] familyBytes = Bytes.toBytes("f");
+ byte[] columnBytes = Bytes.toBytes("c");
+ HRegion region = createRegion(tableName, familyBytes, 0);
+ try {
+ region.initialize();
+ TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
+ LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
+
+ byte[] row = Bytes.toBytes(1);
+ for (int i = 4; i < V.length; i++) {
+ Put p = new Put(row);
+ p.addColumn(familyBytes, columnBytes, V[i], Bytes.toBytes(V[i]));
+ region.put(p);
+ }
+
+ // delete from the third entry back
+ // take that cell's timestamp + 1 to simulate a delete in a new tx
+ long deleteTs = V[5] + 1;
+ Delete d = new Delete(row, deleteTs);
+ LOG.info("Issuing delete at timestamp " + deleteTs);
+ // row deletes are not yet supported (TransactionAwareHTable normally handles this)
+ d.addColumns(familyBytes, columnBytes);
+ region.delete(d);
+
+ List<Cell> results = Lists.newArrayList();
+
+ // force a flush to clear the data
+ // during flush, we should drop the deleted version, but not the others
+ LOG.info("Flushing region " + region.getRegionInfo().getRegionNameAsString());
+ region.flushcache(true, false, new FlushLifeCycleTracker() { });
+
+ // now a normal scan should return row with versions at: V[8], V[6].
+ // V[7] is invalid and V[5] and prior are deleted.
+ Scan scan = new Scan();
+ scan.setMaxVersions(10);
+ RegionScanner regionScanner = region.getScanner(scan);
+ // should be only one row
+ assertFalse(regionScanner.next(results));
+ assertKeyValueMatches(results, 1,
+ new long[]{V[8], V[6], deleteTs},
+ new byte[][]{Bytes.toBytes(V[8]), Bytes.toBytes(V[6]), new byte[0]});
+ } finally {
+ region.close();
+ }
+ }
+
+ @Test
+ public void testDeleteMarkerCleanup() throws Exception {
+ String tableName = "TestDeleteMarkerCleanup";
+ byte[] familyBytes = Bytes.toBytes("f");
+ HRegion region = createRegion(tableName, familyBytes, 0);
+ try {
+ region.initialize();
+
+ // all puts use a timestamp before the tx snapshot's visibility upper bound, making them eligible for removal
+ long writeTs = txVisibilityState.getVisibilityUpperBound() - 10;
+ // deletes are performed after the writes, but still before the visibility upper bound
+ long deleteTs = writeTs + 1;
+ // write separate columns to confirm that delete markers survive across flushes
+ byte[] row = Bytes.toBytes(100);
+ Put p = new Put(row);
+
+ LOG.info("Writing columns at timestamp " + writeTs);
+ for (int i = 0; i < 5; i++) {
+ byte[] iBytes = Bytes.toBytes(i);
+ p.addColumn(familyBytes, iBytes, writeTs, iBytes);
+ }
+ region.put(p);
+ // read all back
+ Scan scan = new Scan(row);
+ RegionScanner regionScanner = region.getScanner(scan);
+ List<Cell> results = Lists.newArrayList();
+ assertFalse(regionScanner.next(results));
+
+ for (int i = 0; i < 5; i++) {
+ Cell cell = results.get(i);
+ assertArrayEquals(row, CellUtil.cloneRow(cell));
+ byte[] idxBytes = Bytes.toBytes(i);
+ assertArrayEquals(idxBytes, CellUtil.cloneQualifier(cell));
+ assertArrayEquals(idxBytes, CellUtil.cloneValue(cell));
+ }
+
+ // force a flush to clear the memstore
+ LOG.info("Before delete, flushing region " + region.getRegionInfo().getRegionNameAsString());
+ region.flushcache(false, false, new FlushLifeCycleTracker() { });
+ // delete the odd entries
+ for (int i = 0; i < 5; i++) {
+ if (i % 2 == 1) {
+ // deletes are performed as puts with empty values
+ Put deletePut = new Put(row);
+ deletePut.addColumn(familyBytes, Bytes.toBytes(i), deleteTs, new byte[0]);
+ region.put(deletePut);
+ }
+ }
+
+ // read all back
+ scan = new Scan(row);
+ scan.setFilter(TransactionFilters.getVisibilityFilter(TxUtils.createDummyTransaction(txVisibilityState),
+ new TreeMap<byte[], Long>(), false, ScanType.USER_SCAN));
+ regionScanner = region.getScanner(scan);
+ results = Lists.newArrayList();
+ assertFalse(regionScanner.next(results));
+ assertEquals(3, results.size());
+ // only even columns should exist
+ for (int i = 0; i < 3; i++) {
+ Cell cell = results.get(i);
+ LOG.info("Got cell " + cell);
+ assertArrayEquals(row, CellUtil.cloneRow(cell));
+ byte[] idxBytes = Bytes.toBytes(i * 2);
+ assertArrayEquals(idxBytes, CellUtil.cloneQualifier(cell));
+ assertArrayEquals(idxBytes, CellUtil.cloneValue(cell));
+ }
+
+ // force another flush on the delete markers
+ // during flush, we should retain the delete markers, since they can only safely be dropped by a major compaction
+ LOG.info("After delete, flushing region " + region.getRegionInfo().getRegionNameAsString());
+ FlushResultImpl flushResultImpl = region.flushcache(true, false, new FlushLifeCycleTracker() { });
+ assertTrue(flushResultImpl
+ .getResult() == FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED
+ || flushResultImpl.getResult() == FlushResult.Result.FLUSHED_COMPACTION_NEEDED);
+ scan = new Scan(row);
+ scan.setFilter(TransactionFilters.getVisibilityFilter(TxUtils.createDummyTransaction(txVisibilityState),
+ new TreeMap<byte[], Long>(), false, ScanType.USER_SCAN));
+ scan.readVersions(1);
+ regionScanner = region.getScanner(scan);
+ results = Lists.newArrayList();
+ assertFalse(regionScanner.next(results));
+ assertEquals(3, results.size());
+ // only even columns should exist
+ for (int i = 0; i < 3; i++) {
+ Cell cell = results.get(i);
+ assertArrayEquals(row, CellUtil.cloneRow(cell));
+ byte[] idxBytes = Bytes.toBytes(i * 2);
+ assertArrayEquals(idxBytes, CellUtil.cloneQualifier(cell));
+ assertArrayEquals(idxBytes, CellUtil.cloneValue(cell));
+ }
+
+ // force a major compaction
+ LOG.info("Forcing major compaction of region " + region.getRegionInfo().getRegionNameAsString());
+ region.compact(true);
+
+ // perform a raw scan (no filter) to confirm that the delete markers are now gone
+ scan = new Scan(row);
+ System.out.println("scan started");
+ regionScanner = region.getScanner(scan);
+ results = Lists.newArrayList();
+ assertFalse(regionScanner.next(results));
+ assertEquals(3, results.size());
+ // only even columns should exist
+ for (int i = 0; i < 3; i++) {
+ Cell cell = results.get(i);
+ assertArrayEquals(row, CellUtil.cloneRow(cell));
+ byte[] idxBytes = Bytes.toBytes(i * 2);
+ assertArrayEquals(idxBytes, CellUtil.cloneQualifier(cell));
+ assertArrayEquals(idxBytes, CellUtil.cloneValue(cell));
+ }
+ } finally {
+ region.close();
+ }
+ }
+
+ /**
+ * Test that we correctly preserve the timestamp set for column family delete markers. This is not
+ * directly required for the TransactionAwareHTable usage, but is the right thing to do and ensures
+ * that we make it easy to interoperate with other systems.
+ */
+ @Test
+ public void testFamilyDeleteTimestamp() throws Exception {
+ String tableName = "TestFamilyDeleteTimestamp";
+ byte[] family1Bytes = Bytes.toBytes("f1");
+ byte[] columnBytes = Bytes.toBytes("c");
+ byte[] rowBytes = Bytes.toBytes("row");
+ byte[] valBytes = Bytes.toBytes("val");
+ HRegion region = createRegion(tableName, family1Bytes, 0);
+ try {
+ region.initialize();
+
+ long now = System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS;
+ Put p = new Put(rowBytes);
+ p.addColumn(family1Bytes, columnBytes, now - 10, valBytes);
+ region.put(p);
+
+ // issue a family delete with an explicit timestamp
+ Delete delete = new Delete(rowBytes, now);
+ delete.addFamily(family1Bytes, now - 5);
+ region.delete(delete);
+
+ // test that the delete marker preserved the timestamp
+ Scan scan = new Scan();
+ scan.setMaxVersions();
+ RegionScanner scanner = region.getScanner(scan);
+ List<Cell> results = Lists.newArrayList();
+ scanner.next(results);
+ assertEquals(2, results.size());
+ // delete marker should appear first
+ Cell cell = results.get(0);
+ assertArrayEquals(new byte[0], CellUtil.cloneQualifier(cell));
+ assertArrayEquals(new byte[0], CellUtil.cloneValue(cell));
+ assertEquals(now - 5, cell.getTimestamp());
+ // since this is an unfiltered scan against the region, the original put should be next
+ cell = results.get(1);
+ assertArrayEquals(valBytes, CellUtil.cloneValue(cell));
+ assertEquals(now - 10, cell.getTimestamp());
+ scanner.close();
+
+
+ // with a filtered scan the original put should disappear
+ scan = new Scan();
+ scan.setMaxVersions();
+ scan.setFilter(TransactionFilters.getVisibilityFilter(TxUtils.createDummyTransaction(txVisibilityState),
+ new TreeMap<byte[], Long>(), false, ScanType.USER_SCAN));
+ scanner = region.getScanner(scan);
+ results = Lists.newArrayList();
+ scanner.next(results);
+ assertEquals(0, results.size());
+ scanner.close();
+ } finally {
+ region.close();
+ }
+ }
+
+ @Test
+ public void testPreExistingData() throws Exception {
+ String tableName = "TestPreExistingData";
+ byte[] familyBytes = Bytes.toBytes("f");
+ long ttlMillis = TimeUnit.DAYS.toMillis(14);
+ HRegion region = createRegion(tableName, familyBytes, ttlMillis);
+ try {
+ region.initialize();
+
+ // timestamps for pre-existing, non-transactional data
+ long now = txVisibilityState.getVisibilityUpperBound() / TxConstants.MAX_TX_PER_MS;
+ long older = now - ttlMillis / 2;
+ long newer = now - ttlMillis / 3;
+ // timestamps for transactional data
+ long nowTx = txVisibilityState.getVisibilityUpperBound();
+ long olderTx = nowTx - (ttlMillis / 2) * TxConstants.MAX_TX_PER_MS;
+ long newerTx = nowTx - (ttlMillis / 3) * TxConstants.MAX_TX_PER_MS;
+
+ Map<byte[], Long> ttls = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+ ttls.put(familyBytes, ttlMillis);
+
+ List<Cell> cells = new ArrayList<>();
+ cells.add(new KeyValue(Bytes.toBytes("r1"), familyBytes, Bytes.toBytes("c1"), older, Bytes.toBytes("v11")));
+ cells.add(new KeyValue(Bytes.toBytes("r1"), familyBytes, Bytes.toBytes("c2"), newer, Bytes.toBytes("v12")));
+ cells.add(new KeyValue(Bytes.toBytes("r2"), familyBytes, Bytes.toBytes("c1"), older, Bytes.toBytes("v21")));
+ cells.add(new KeyValue(Bytes.toBytes("r2"), familyBytes, Bytes.toBytes("c2"), newer, Bytes.toBytes("v22")));
+ cells.add(new KeyValue(Bytes.toBytes("r3"), familyBytes, Bytes.toBytes("c1"), olderTx, Bytes.toBytes("v31")));
+ cells.add(new KeyValue(Bytes.toBytes("r3"), familyBytes, Bytes.toBytes("c2"), newerTx, Bytes.toBytes("v32")));
+
+ // Write non-transactional and transactional data
+ for (Cell c : cells) {
+ region.put(new Put(CellUtil.cloneRow(c)).addColumn(CellUtil.cloneFamily(c), CellUtil.cloneQualifier(c),
+ c.getTimestamp(), CellUtil.cloneValue(c)));
+ }
+
+ Scan rawScan = new Scan();
+ rawScan.setMaxVersions();
+
+ Transaction dummyTransaction = TxUtils.createDummyTransaction(txVisibilityState);
+ Scan txScan = new Scan();
+ txScan.setMaxVersions();
+ txScan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttls, dummyTransaction, true),
+ TxUtils.getMaxVisibleTimestamp(dummyTransaction));
+ txScan.setFilter(TransactionFilters.getVisibilityFilter(dummyTransaction, ttls, false, ScanType.USER_SCAN));
+
+ // read all back with raw scanner
+ scanAndAssert(region, cells, rawScan);
+
+ // read all back with transaction filter
+ scanAndAssert(region, cells, txScan);
+
+ // force a flush to clear the memstore
+ region.flushcache(true, false, new FlushLifeCycleTracker() { });
+ scanAndAssert(region, cells, txScan);
+
+ // force a major compaction to remove any expired cells
+ region.compact(true);
+ scanAndAssert(region, cells, txScan);
+
+ // Reduce TTL, this should make cells with timestamps older and olderTx expire
+ long newTtl = ttlMillis / 2 - 1;
+ region = updateTtl(region, familyBytes, newTtl);
+ ttls.put(familyBytes, newTtl);
+ txScan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttls, dummyTransaction, true),
+ TxUtils.getMaxVisibleTimestamp(dummyTransaction));
+ txScan.setFilter(TransactionFilters.getVisibilityFilter(dummyTransaction, ttls, false, ScanType.USER_SCAN));
+
+ // Raw scan should still give all cells
+ scanAndAssert(region, cells, rawScan);
+ // However, tx scan should not return expired cells
+ scanAndAssert(region, select(cells, 1, 3, 5), txScan);
+
+ region.flushcache(true, false, new FlushLifeCycleTracker() { });
+ scanAndAssert(region, cells, rawScan);
+
+ // force a major compaction to remove any expired cells
+ region.compact(true);
+ // This time raw scan too should not return expired cells, as they would be dropped during major compaction
+ scanAndAssert(region, select(cells, 1, 3, 5), rawScan);
+
+ // Reduce TTL again to 1 ms, this should expire all cells
+ newTtl = 1;
+ region = updateTtl(region, familyBytes, newTtl);
+ ttls.put(familyBytes, newTtl);
+ txScan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttls, dummyTransaction, true),
+ TxUtils.getMaxVisibleTimestamp(dummyTransaction));
+ txScan.setFilter(TransactionFilters.getVisibilityFilter(dummyTransaction, ttls, false, ScanType.USER_SCAN));
+
+ // force a major compaction to remove expired cells
+ region.compact(true);
+ // This time raw scan should not return any cells, as all cells have expired.
+ scanAndAssert(region, Collections.<Cell>emptyList(), rawScan);
+ } finally {
+ region.close();
+ }
+ }
+
+ private List<Cell> select(List<Cell> cells, int... indexes) {
+ List<Cell> newCells = new ArrayList<>();
+ for (int i : indexes) {
+ newCells.add(cells.get(i));
+ }
+ return newCells;
+ }
+
+ @SuppressWarnings("StatementWithEmptyBody")
+ private void scanAndAssert(HRegion region, List<Cell> expected, Scan scan) throws Exception {
+ try (RegionScanner regionScanner = region.getScanner(scan)) {
+ List<Cell> results = Lists.newArrayList();
+ while (regionScanner.next(results)) { }
+ assertEquals(expected, results);
+ }
+ }
+
+ private HRegion updateTtl(HRegion region, byte[] family, long ttl) throws Exception {
+ region.close();
+ TableDescriptorBuilder tableBuilder =
+ TableDescriptorBuilder.newBuilder(region.getTableDescriptor());
+ ColumnFamilyDescriptorBuilder cfd =
+ ColumnFamilyDescriptorBuilder.newBuilder(tableBuilder.build().getColumnFamily(family));
+ if (ttl > 0) {
+ cfd.setValue(Bytes.toBytes(TxConstants.PROPERTY_TTL), Bytes.toBytes(String.valueOf(ttl)));
+ }
+ cfd.setMaxVersions(10);
+ tableBuilder.removeColumnFamily(family);
+ tableBuilder.addColumnFamily(cfd.build());
+ return HRegion
+ .openHRegion(region.getRegionInfo(), tableBuilder.build(), region.getWAL(), conf,
+ new LocalRegionServerServices(conf, ServerName
+ .valueOf(InetAddress.getLocalHost().getHostName(), 0, System.currentTimeMillis())),
+ null);
+ }
+
+ private HRegion createRegion(String tableName, byte[] family, long ttl) throws IOException {
+ HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
+ HColumnDescriptor cfd = new HColumnDescriptor(family);
+ if (ttl > 0) {
+ cfd.setValue(TxConstants.PROPERTY_TTL, String.valueOf(ttl));
+ }
+ cfd.setMaxVersions(10);
+ htd.addFamily(cfd);
+ htd.addCoprocessor(TransactionProcessor.class.getName());
+ Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), htd.getTableName());
+ FileSystem fs = FileSystem.get(conf);
+ assertTrue(fs.mkdirs(tablePath));
+ WALFactory walFactory = new WALFactory(conf, null, tableName + ".hlog");
+ HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
+ WAL hLog = walFactory.getWAL(info);
+ HRegionInfo regionInfo = new HRegionInfo(TableName.valueOf(tableName));
+ HRegionFileSystem regionFS = HRegionFileSystem.createRegionOnFileSystem(conf, fs, tablePath, regionInfo);
+ return new HRegion(regionFS, hLog, conf, htd,
+ new LocalRegionServerServices(conf, ServerName.valueOf(
+ InetAddress.getLocalHost().getHostName(), 0, System.currentTimeMillis())));
+ }
+
+ private void assertKeyValueMatches(List<Cell> results, int index, long[] versions) {
+ byte[][] values = new byte[versions.length][];
+ for (int i = 0; i < versions.length; i++) {
+ values[i] = Bytes.toBytes(versions[i]);
+ }
+ assertKeyValueMatches(results, index, versions, values);
+ }
+
+ private void assertKeyValueMatches(List<Cell> results, int index, long[] versions, byte[][] values) {
+ assertEquals(versions.length, results.size());
+ assertEquals(values.length, results.size());
+ for (int i = 0; i < versions.length; i++) {
+ Cell kv = results.get(i);
+ assertArrayEquals(Bytes.toBytes(index), CellUtil.cloneRow(kv));
+ assertEquals(versions[i], kv.getTimestamp());
+ assertArrayEquals(values[i], CellUtil.cloneValue(kv));
+ }
+ }
+
+ @Test
+ public void testTransactionStateCache() throws Exception {
+ TransactionStateCache cache = new TransactionStateCache();
+ cache.setConf(conf);
+ cache.startAndWait();
+ // verify that the transaction snapshot read matches what we wrote in setupBeforeClass()
+ TransactionVisibilityState cachedSnapshot = cache.getLatestState();
+ assertNotNull(cachedSnapshot);
+ assertEquals(invalidSet, cachedSnapshot.getInvalid());
+ cache.stopAndWait();
+ }
+
+ private TransactionVisibilityState waitForTransactionState(TransactionStateCache cache) throws InterruptedException {
+ long timeout = 5000; // ms
+ do {
+ TransactionVisibilityState state = cache.getLatestState();
+ if (state != null) {
+ return state;
+ }
+ TimeUnit.MILLISECONDS.sleep(100);
+ timeout -= 100;
+ } while (timeout > 0L);
+ LOG.error("Timed out waiting foe transaction state cache");
+ Assert.fail("Timed out waiting foe transaction state cache");
+ return null;
+ }
+
+ private static class LocalRegionServerServices extends MockRegionServerServices {
+ private final ServerName serverName;
+ private ChoreService choreService;
+
+ public LocalRegionServerServices(Configuration conf, ServerName serverName) {
+ super(conf);
+ this.serverName = serverName;
+ this.choreService = new ChoreService(getServerName().toString(), true);
+ }
+
+ @Override
+ public ServerName getServerName() {
+ return serverName;
+ }
+ @Override
+ public ChoreService getChoreService() {
+ return choreService;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/db6ef6d2/tephra-hbase-compat-2.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-2.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-2.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
new file mode 100644
index 0000000..1b02609
--- /dev/null
+++ b/tephra-hbase-compat-2.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.coprocessor;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.hbase.AbstractTransactionVisibilityFilterTest;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * HBase 1.1 specific test for filtering logic applied when reading data transactionally.
+ */
+public class TransactionVisibilityFilterTest extends AbstractTransactionVisibilityFilterTest {
+ /**
+ * Test filtering of KeyValues for in-progress and invalid transactions.
+ * @throws Exception
+ */
+ @Test
+ public void testFiltering() throws Exception {
+ TxFilterFactory txFilterFactory = new TxFilterFactory() {
+ @Override
+ public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
+ return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN);
+ }
+ };
+ runFilteringTest(txFilterFactory,
+ ImmutableList.of(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ Filter.ReturnCode.SKIP,
+ Filter.ReturnCode.SKIP,
+ Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ Filter.ReturnCode.INCLUDE_AND_NEXT_COL));
+ }
+
+ @Test
+ public void testSubFilter() throws Exception {
+ final FilterBase includeFilter = new FilterBase() {
+ @Override
+ public ReturnCode filterKeyValue(Cell ignored) throws IOException {
+ return ReturnCode.INCLUDE;
+ }
+ };
+ TxFilterFactory txFilterFactory = new TxFilterFactory() {
+ @Override
+ public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
+ return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeFilter);
+ }
+ };
+ runFilteringTest(txFilterFactory,
+ ImmutableList.of(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ Filter.ReturnCode.SKIP,
+ Filter.ReturnCode.SKIP,
+ Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ Filter.ReturnCode.INCLUDE_AND_NEXT_COL));
+
+ final Filter skipFilter = new FilterBase() {
+ @Override
+ public ReturnCode filterKeyValue(Cell ignored) throws IOException {
+ return ReturnCode.SKIP;
+ }
+ };
+ txFilterFactory = new TxFilterFactory() {
+ @Override
+ public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
+ return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, skipFilter);
+ }
+ };
+ runFilteringTest(txFilterFactory,
+ ImmutableList.of(Filter.ReturnCode.NEXT_COL,
+ Filter.ReturnCode.NEXT_COL,
+ Filter.ReturnCode.SKIP,
+ Filter.ReturnCode.SKIP,
+ Filter.ReturnCode.NEXT_COL,
+ Filter.ReturnCode.NEXT_COL));
+
+ final Filter includeNextFilter = new FilterBase() {
+ @Override
+ public ReturnCode filterKeyValue(Cell ignored) throws IOException {
+ return ReturnCode.INCLUDE_AND_NEXT_COL;
+ }
+ };
+ txFilterFactory = new TxFilterFactory() {
+ @Override
+ public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
+ return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeNextFilter);
+ }
+ };
+ runFilteringTest(txFilterFactory,
+ ImmutableList.of(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ Filter.ReturnCode.SKIP,
+ Filter.ReturnCode.SKIP,
+ Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ Filter.ReturnCode.INCLUDE_AND_NEXT_COL));
+
+ final Filter nextColFilter = new FilterBase() {
+ @Override
+ public ReturnCode filterKeyValue(Cell ignored) throws IOException {
+ return ReturnCode.NEXT_COL;
+ }
+ };
+ txFilterFactory = new TxFilterFactory() {
+ @Override
+ public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
+ return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, nextColFilter);
+ }
+ };
+ runFilteringTest(txFilterFactory,
+ ImmutableList.of(Filter.ReturnCode.NEXT_COL,
+ Filter.ReturnCode.NEXT_COL,
+ Filter.ReturnCode.SKIP,
+ Filter.ReturnCode.SKIP,
+ Filter.ReturnCode.NEXT_COL,
+ Filter.ReturnCode.NEXT_COL));
+
+ }
+
+ @Test
+ public void testSubFilterOverride() throws Exception {
+ final FilterBase includeFilter = new FilterBase() {
+ @Override
+ public ReturnCode filterKeyValue(Cell ignored) throws IOException {
+ return ReturnCode.INCLUDE;
+ }
+ };
+ TxFilterFactory txFilterFactory = new TxFilterFactory() {
+ @Override
+ public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
+ return new CustomTxFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeFilter);
+ }
+ };
+ runFilteringTest(txFilterFactory,
+ ImmutableList.of(Filter.ReturnCode.INCLUDE,
+ Filter.ReturnCode.INCLUDE,
+ Filter.ReturnCode.SKIP,
+ Filter.ReturnCode.SKIP,
+ Filter.ReturnCode.INCLUDE,
+ Filter.ReturnCode.INCLUDE));
+
+ final Filter skipFilter = new FilterBase() {
+ @Override
+ public ReturnCode filterKeyValue(Cell ignored) throws IOException {
+ return ReturnCode.SKIP;
+ }
+ };
+ txFilterFactory = new TxFilterFactory() {
+ @Override
+ public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
+ return new CustomTxFilter(tx, familyTTLs, false, ScanType.USER_SCAN, skipFilter);
+ }
+ };
+ runFilteringTest(txFilterFactory,
+ ImmutableList.of(Filter.ReturnCode.NEXT_COL,
+ Filter.ReturnCode.NEXT_COL,
+ Filter.ReturnCode.SKIP,
+ Filter.ReturnCode.SKIP,
+ Filter.ReturnCode.NEXT_COL,
+ Filter.ReturnCode.NEXT_COL));
+
+ final Filter includeNextFilter = new FilterBase() {
+ @Override
+ public ReturnCode filterKeyValue(Cell ignored) throws IOException {
+ return ReturnCode.INCLUDE_AND_NEXT_COL;
+ }
+ };
+ txFilterFactory = new TxFilterFactory() {
+ @Override
+ public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
+ return new CustomTxFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeNextFilter);
+ }
+ };
+ runFilteringTest(txFilterFactory,
+ ImmutableList.of(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ Filter.ReturnCode.SKIP,
+ Filter.ReturnCode.SKIP,
+ Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ Filter.ReturnCode.INCLUDE_AND_NEXT_COL));
+
+ final Filter nextColFilter = new FilterBase() {
+ @Override
+ public ReturnCode filterKeyValue(Cell ignored) throws IOException {
+ return ReturnCode.NEXT_COL;
+ }
+ };
+ txFilterFactory = new TxFilterFactory() {
+ @Override
+ public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
+ return new CustomTxFilter(tx, familyTTLs, false, ScanType.USER_SCAN, nextColFilter);
+ }
+ };
+ runFilteringTest(txFilterFactory,
+ ImmutableList.of(Filter.ReturnCode.NEXT_COL,
+ Filter.ReturnCode.NEXT_COL,
+ Filter.ReturnCode.SKIP,
+ Filter.ReturnCode.SKIP,
+ Filter.ReturnCode.NEXT_COL,
+ Filter.ReturnCode.NEXT_COL));
+
+ }
+
+ private void runFilteringTest(TxFilterFactory txFilterFactory,
+ List<Filter.ReturnCode> assertCodes) throws Exception {
+
+ /*
+ * Start and stop some transactions. This will give us a transaction state something like the following
+ * (numbers only reflect ordering, not actual transaction IDs):
+ * 6 - in progress
+ * 5 - committed
+ * 4 - invalid
+ * 3 - in-progress
+ * 2 - committed
+ * 1 - committed
+ *
+ * read ptr = 5
+ * write ptr = 6
+ */
+
+ Transaction tx1 = txManager.startShort();
+ txManager.canCommit(tx1.getTransactionId(), EMPTY_CHANGESET);
+ txManager.commit(tx1.getTransactionId(), tx1.getWritePointer());
+
+ Transaction tx2 = txManager.startShort();
+ txManager.canCommit(tx2.getTransactionId(), EMPTY_CHANGESET);
+ txManager.commit(tx2.getTransactionId(), tx2.getWritePointer());
+
+ Transaction tx3 = txManager.startShort();
+ Transaction tx4 = txManager.startShort();
+ txManager.invalidate(tx4.getTransactionId());
+
+ Transaction tx5 = txManager.startShort();
+ txManager.canCommit(tx5.getTransactionId(), EMPTY_CHANGESET);
+ txManager.commit(tx5.getTransactionId(), tx5.getWritePointer());
+
+ Transaction tx6 = txManager.startShort();
+
+ Map<byte[], Long> ttls = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+ Filter filter = txFilterFactory.getTxFilter(tx6, ttls);
+
+ assertEquals(assertCodes.get(5),
+ filter.filterKeyValue(newKeyValue("row1", "val1", tx6.getTransactionId())));
+ assertEquals(assertCodes.get(4),
+ filter.filterKeyValue(newKeyValue("row1", "val1", tx5.getTransactionId())));
+ assertEquals(assertCodes.get(3),
+ filter.filterKeyValue(newKeyValue("row1", "val1", tx4.getTransactionId())));
+ assertEquals(assertCodes.get(2),
+ filter.filterKeyValue(newKeyValue("row1", "val1", tx3.getTransactionId())));
+ assertEquals(assertCodes.get(1),
+ filter.filterKeyValue(newKeyValue("row1", "val1", tx2.getTransactionId())));
+ assertEquals(assertCodes.get(0),
+ filter.filterKeyValue(newKeyValue("row1", "val1", tx1.getTransactionId())));
+ }
+
+ /**
+ * Test filtering for TTL settings.
+ * @throws Exception
+ */
+ @Test
+ public void testTTLFiltering() throws Exception {
+ Map<byte[], Long> ttls = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+ ttls.put(FAM, 10L);
+ ttls.put(FAM2, 30L);
+ ttls.put(FAM3, 0L);
+
+ Transaction tx = txManager.startShort();
+ long now = tx.getVisibilityUpperBound();
+ Filter filter = new TransactionVisibilityFilter(tx, ttls, false, ScanType.USER_SCAN);
+ assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now)));
+ assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now - 1 * TxConstants.MAX_TX_PER_MS)));
+ assertEquals(Filter.ReturnCode.NEXT_COL,
+ filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now - 11 * TxConstants.MAX_TX_PER_MS)));
+ assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ filter.filterKeyValue(newKeyValue("row1", FAM2, "val1", now - 11 * TxConstants.MAX_TX_PER_MS)));
+ assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ filter.filterKeyValue(newKeyValue("row1", FAM2, "val1", now - 21 * TxConstants.MAX_TX_PER_MS)));
+ assertEquals(Filter.ReturnCode.NEXT_COL,
+ filter.filterKeyValue(newKeyValue("row1", FAM2, "val1", now - 31 * TxConstants.MAX_TX_PER_MS)));
+ assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ filter.filterKeyValue(newKeyValue("row1", FAM3, "val1", now - 31 * TxConstants.MAX_TX_PER_MS)));
+ assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ filter.filterKeyValue(newKeyValue("row1", FAM3, "val1", now - 1001 * TxConstants.MAX_TX_PER_MS)));
+ assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ filter.filterKeyValue(newKeyValue("row2", FAM, "val1", now)));
+ assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ filter.filterKeyValue(newKeyValue("row2", FAM, "val1", now - 1 * TxConstants.MAX_TX_PER_MS)));
+
+ // Verify ttl for pre-existing, non-transactional data
+ long preNow = now / TxConstants.MAX_TX_PER_MS;
+ assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ filter.filterKeyValue(newKeyValue("row1", FAM, "val1", preNow)));
+ assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ filter.filterKeyValue(newKeyValue("row1", FAM, "val1", preNow - 9L)));
+ assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ filter.filterKeyValue(newKeyValue("row1", FAM, "val1", preNow - 10L)));
+ assertEquals(Filter.ReturnCode.NEXT_COL,
+ filter.filterKeyValue(newKeyValue("row1", FAM, "val1", preNow - 11L)));
+ assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ filter.filterKeyValue(newKeyValue("row1", FAM3, "val1", preNow)));
+ assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ filter.filterKeyValue(newKeyValue("row1", FAM3, "val1", preNow - 9L)));
+ assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ filter.filterKeyValue(newKeyValue("row1", FAM3, "val1", preNow - 10L)));
+ assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
+ filter.filterKeyValue(newKeyValue("row1", FAM3, "val1", preNow - 1001L)));
+ }
+
+ protected KeyValue newKeyValue(String rowkey, String value, long timestamp) {
+ return new KeyValue(Bytes.toBytes(rowkey), FAM, COL, timestamp, Bytes.toBytes(value));
+ }
+
+ protected KeyValue newKeyValue(String rowkey, byte[] family, String value, long timestamp) {
+ return new KeyValue(Bytes.toBytes(rowkey), family, COL, timestamp, Bytes.toBytes(value));
+ }
+
+ private interface TxFilterFactory {
+ Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs);
+ }
+
+ private class CustomTxFilter extends TransactionVisibilityFilter {
+ public CustomTxFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues, ScanType scanType,
+ @Nullable Filter cellFilter) {
+ super(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter);
+ }
+
+ @Override
+ protected ReturnCode determineReturnCode(ReturnCode txFilterCode, ReturnCode subFilterCode) {
+ switch (subFilterCode) {
+ case INCLUDE:
+ return ReturnCode.INCLUDE;
+ case INCLUDE_AND_NEXT_COL:
+ return ReturnCode.INCLUDE_AND_NEXT_COL;
+ case SKIP:
+ return txFilterCode == ReturnCode.INCLUDE ? ReturnCode.SKIP : ReturnCode.NEXT_COL;
+ default:
+ return subFilterCode;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/db6ef6d2/tephra-hbase-compat-2.0/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-2.0/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java b/tephra-hbase-compat-2.0/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
new file mode 100644
index 0000000..244730a
--- /dev/null
+++ b/tephra-hbase-compat-2.0/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.ImmutableSortedSet;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.hbase.AbstractHBaseTableTest;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+/**
+ * Test methods of {@link DataJanitorState}
+ */
+// TODO: Group all the tests that need HBase mini cluster into a suite, so that we start the mini-cluster only once
+public class DataJanitorStateTest extends AbstractHBaseTableTest {
+
+ private TableName pruneStateTable;
+ private DataJanitorState dataJanitorState;
+
+ @Before
+ public void beforeTest() throws Exception {
+ pruneStateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
+ Table table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY}, false,
+ // Prune state table is a non-transactional table, hence no transaction co-processor
+ Collections.<String>emptyList());
+ table.close();
+
+ dataJanitorState =
+ new DataJanitorState(new DataJanitorState.TableSupplier() {
+ @Override
+ public Table get() throws IOException {
+ return testUtil.getConnection().getTable(pruneStateTable);
+ }
+ });
+
+ }
+
+ @After
+ public void afterTest() throws Exception {
+ hBaseAdmin.disableTable(pruneStateTable);
+ hBaseAdmin.deleteTable(pruneStateTable);
+ }
+
+ @Test
+ public void testSavePruneUpperBound() throws Exception {
+ int max = 20;
+
+ // Nothing should be present in the beginning
+ Assert.assertEquals(-1, dataJanitorState.getPruneUpperBoundForRegion(Bytes.toBytes(10L)));
+
+ // Save some region - prune upper bound values
+ // We should have values for regions 0, 2, 4, 6, ..., max-2 after this
+ for (long i = 0; i < max; i += 2) {
+ dataJanitorState.savePruneUpperBoundForRegion(Bytes.toBytes(i), i);
+ }
+
+ Assert.assertEquals(10L, dataJanitorState.getPruneUpperBoundForRegion(Bytes.toBytes(10L)));
+
+ // Verify all the saved values
+ for (long i = 0; i < max; ++i) {
+ long expected = i % 2 == 0 ? i : -1;
+ Assert.assertEquals(expected, dataJanitorState.getPruneUpperBoundForRegion(Bytes.toBytes(i)));
+ }
+ // Regions not present should give -1
+ Assert.assertEquals(-1, dataJanitorState.getPruneUpperBoundForRegion(Bytes.toBytes(max + 50L)));
+ Assert.assertEquals(-1, dataJanitorState.getPruneUpperBoundForRegion(Bytes.toBytes((max + 10L) * -1)));
+ Assert.assertEquals(-1, dataJanitorState.getPruneUpperBoundForRegion(Bytes.toBytes(3L)));
+
+ SortedSet<byte[]> allRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+ Map<byte[], Long> expectedMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ for (long i = 0; i < max; ++i) {
+ allRegions.add(Bytes.toBytes(i));
+ if (i % 2 == 0) {
+ expectedMap.put(Bytes.toBytes(i), i);
+ }
+ }
+ Assert.assertEquals(max / 2, expectedMap.size());
+ Assert.assertEquals(expectedMap, dataJanitorState.getPruneUpperBoundForRegions(allRegions));
+
+ SortedSet<byte[]> regions = ImmutableSortedSet.orderedBy(Bytes.BYTES_COMPARATOR)
+ .add(Bytes.toBytes((max + 20L) * -1))
+ .add(Bytes.toBytes(6L))
+ .add(Bytes.toBytes(15L))
+ .add(Bytes.toBytes(18L))
+ .add(Bytes.toBytes(max + 33L))
+ .build();
+ expectedMap = ImmutableSortedMap.<byte[], Long>orderedBy(Bytes.BYTES_COMPARATOR)
+ .put(Bytes.toBytes(6L), 6L)
+ .put(Bytes.toBytes(18L), 18L)
+ .build();
+ Assert.assertEquals(expectedMap, dataJanitorState.getPruneUpperBoundForRegions(regions));
+
+ // Delete regions that have prune upper bound before 15 and not in set (4, 8)
+ ImmutableSortedSet<byte[]> excludeRegions =
+ ImmutableSortedSet.orderedBy(Bytes.BYTES_COMPARATOR).add(Bytes.toBytes(4L)).add(Bytes.toBytes(8L)).build();
+ dataJanitorState.deletePruneUpperBounds(15, excludeRegions);
+ // Regions 0, 2, 6 and 10 should have been deleted now
+ expectedMap = ImmutableSortedMap.<byte[], Long>orderedBy(Bytes.BYTES_COMPARATOR)
+ .put(Bytes.toBytes(4L), 4L)
+ .put(Bytes.toBytes(8L), 8L)
+ .put(Bytes.toBytes(16L), 16L)
+ .put(Bytes.toBytes(18L), 18L)
+ .build();
+ Assert.assertEquals(expectedMap, dataJanitorState.getPruneUpperBoundForRegions(allRegions));
+ }
+
+ @Test(timeout = 30000L) // The timeout is used to verify the fix for TEPHRA-230, the test will timeout without the fix
+ public void testSaveRegionTime() throws Exception {
+ int maxTime = 100;
+
+ // Nothing should be present in the beginning
+ Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(maxTime));
+
+ // Save regions for time
+ Map<Long, SortedSet<byte[]>> regionsTime = new TreeMap<>();
+ for (long time = 0; time < maxTime; time += 10) {
+ SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+ for (long region = 0; region < 10; region += 2) {
+ regions.add(Bytes.toBytes((time * 10) + region));
+ }
+ regionsTime.put(time, regions);
+ dataJanitorState.saveRegionsForTime(time, regions);
+ }
+
+ // Verify saved regions
+ Assert.assertEquals(new TimeRegions(0, regionsTime.get(0L)), dataJanitorState.getRegionsOnOrBeforeTime(0));
+ Assert.assertEquals(new TimeRegions(30, regionsTime.get(30L)), dataJanitorState.getRegionsOnOrBeforeTime(30));
+ Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(25));
+ Assert.assertEquals(new TimeRegions(30, regionsTime.get(30L)), dataJanitorState.getRegionsOnOrBeforeTime(31));
+ Assert.assertEquals(new TimeRegions(90, regionsTime.get(90L)),
+ dataJanitorState.getRegionsOnOrBeforeTime(maxTime + 1000));
+ Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(-10));
+
+ // Now change the count stored for regions saved at time 0, 30 and 90
+ try (Table stateTable = testUtil.getConnection().getTable(pruneStateTable)) {
+ dataJanitorState.saveRegionCountForTime(stateTable, Bytes.toBytes(Long.MAX_VALUE), 3);
+ dataJanitorState.saveRegionCountForTime(stateTable, Bytes.toBytes(Long.MAX_VALUE - 30L), 3);
+ dataJanitorState.saveRegionCountForTime(stateTable, Bytes.toBytes(Long.MAX_VALUE - 90L), 0);
+ }
+
+ // Now querying for time 0 should return null, and querying for time 30 should return regions from time 20
+ Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(0));
+ Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(30));
+ Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(35));
+ Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(25));
+ // Querying for anything higher than 90 should give 80 (reproduces TEPHRA-230)
+ Assert.assertEquals(new TimeRegions(80, regionsTime.get(80L)),
+ dataJanitorState.getRegionsOnOrBeforeTime(Long.MAX_VALUE));
+
+ // Delete regions saved on or before time 30
+ dataJanitorState.deleteAllRegionsOnOrBeforeTime(30);
+ // Values on or before time 30 should be deleted
+ Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(30));
+ Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(25));
+ // Counts should be deleted for time on or before 30
+ try (Table stateTable = testUtil.getConnection().getTable(pruneStateTable)) {
+ Assert.assertEquals(-1, dataJanitorState.getRegionCountForTime(stateTable, 30));
+ Assert.assertEquals(-1, dataJanitorState.getRegionCountForTime(stateTable, 0));
+ }
+ // Values after time 30 should still exist
+ Assert.assertEquals(new TimeRegions(40, regionsTime.get(40L)), dataJanitorState.getRegionsOnOrBeforeTime(40));
+ try (Table stateTable = testUtil.getConnection().getTable(pruneStateTable)) {
+ Assert.assertEquals(5, dataJanitorState.getRegionCountForTime(stateTable, 40));
+ }
+ }
+
+ @Test
+ public void testSaveInactiveTransactionBoundTime() throws Exception {
+ int maxTime = 100;
+
+ // Nothing should be present in the beginning
+ Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(10));
+
+ // Save inactive transaction bounds for various time values
+ for (long time = 0; time < maxTime; time += 10) {
+ dataJanitorState.saveInactiveTransactionBoundForTime(time, time + 2);
+ }
+
+ // Verify written values
+ Assert.assertEquals(2, dataJanitorState.getInactiveTransactionBoundForTime(0));
+ Assert.assertEquals(12, dataJanitorState.getInactiveTransactionBoundForTime(10));
+ Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(15));
+ Assert.assertEquals(92, dataJanitorState.getInactiveTransactionBoundForTime(90));
+ Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(maxTime + 100));
+ Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime((maxTime + 55) * -1L));
+
+ // Delete values saved on or before time 20
+ dataJanitorState.deleteInactiveTransactionBoundsOnOrBeforeTime(20);
+ // Values on or before time 20 should be deleted
+ Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(0));
+ Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(10));
+ Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(20));
+ // Values after time 20 should still exist
+ Assert.assertEquals(32, dataJanitorState.getInactiveTransactionBoundForTime(30));
+ Assert.assertEquals(92, dataJanitorState.getInactiveTransactionBoundForTime(90));
+ }
+
+ @Test
+ public void testSaveEmptyRegions() throws Exception {
+ // Nothing should be present in the beginning
+ Assert.assertEquals(ImmutableSortedSet.<byte[]>of(), dataJanitorState.getEmptyRegionsAfterTime(-1, null));
+
+ byte[] region1 = Bytes.toBytes("region1");
+ byte[] region2 = Bytes.toBytes("region2");
+ byte[] region3 = Bytes.toBytes("region3");
+ byte[] region4 = Bytes.toBytes("region4");
+ SortedSet<byte[]> allRegions = toISet(region1, region2, region3, region4);
+
+ // Now record some empty regions
+ dataJanitorState.saveEmptyRegionForTime(100, region1);
+ dataJanitorState.saveEmptyRegionForTime(110, region1);
+ dataJanitorState.saveEmptyRegionForTime(102, region2);
+ dataJanitorState.saveEmptyRegionForTime(112, region3);
+
+ Assert.assertEquals(toISet(region1, region2, region3),
+ dataJanitorState.getEmptyRegionsAfterTime(-1, null));
+
+ Assert.assertEquals(toISet(region1, region2, region3),
+ dataJanitorState.getEmptyRegionsAfterTime(100, allRegions));
+
+ Assert.assertEquals(toISet(region2, region3),
+ dataJanitorState.getEmptyRegionsAfterTime(100, toISet(region2, region3)));
+
+ Assert.assertEquals(toISet(),
+ dataJanitorState.getEmptyRegionsAfterTime(100, ImmutableSortedSet.<byte[]>of()));
+
+ Assert.assertEquals(toISet(region3),
+ dataJanitorState.getEmptyRegionsAfterTime(110, allRegions));
+
+ Assert.assertEquals(toISet(),
+ dataJanitorState.getEmptyRegionsAfterTime(112, allRegions));
+
+ // Delete empty regions on or before time 110
+ dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(110);
+ // Now only region3 should remain
+ Assert.assertEquals(toISet(region3), dataJanitorState.getEmptyRegionsAfterTime(-1, null));
+ Assert.assertEquals(toISet(region3), dataJanitorState.getEmptyRegionsAfterTime(100, allRegions));
+
+ // Delete empty regions on or before time 150
+ dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(150);
+ // Now nothing should remain
+ Assert.assertEquals(toISet(), dataJanitorState.getEmptyRegionsAfterTime(-1, null));
+ }
+
+ private ImmutableSortedSet<byte[]> toISet(byte[]... args) {
+ ImmutableSortedSet.Builder<byte[]> builder = ImmutableSortedSet.orderedBy(Bytes.BYTES_COMPARATOR);
+ for (byte[] arg : args) {
+ builder.add(arg);
+ }
+ return builder.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/db6ef6d2/tephra-hbase-compat-2.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-2.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-2.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
new file mode 100644
index 0000000..75ab6d6
--- /dev/null
+++ b/tephra-hbase-compat-2.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
@@ -0,0 +1,526 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.ImmutableSortedSet;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TransactionContext;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.coprocessor.CacheSupplier;
+import org.apache.tephra.coprocessor.TransactionStateCache;
+import org.apache.tephra.hbase.AbstractHBaseTableTest;
+import org.apache.tephra.hbase.TransactionAwareHTable;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.tephra.inmemory.InMemoryTxSystemClient;
+import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.persist.InMemoryTransactionStateStorage;
+import org.apache.tephra.persist.TransactionSnapshot;
+import org.apache.tephra.persist.TransactionStateStorage;
+import org.apache.tephra.persist.TransactionVisibilityState;
+import org.apache.tephra.txprune.TransactionPruningPlugin;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Test invalid list pruning
+ */
+public class InvalidListPruneTest extends AbstractHBaseTableTest {
+ private static final byte[] family = Bytes.toBytes("f1");
+ private static final byte[] qualifier = Bytes.toBytes("col1");
+ private static final int MAX_ROWS = 1000;
+
+ private static TableName txDataTable1;
+ private static TableName pruneStateTable;
+ private static DataJanitorState dataJanitorState;
+
+ // Override AbstractHBaseTableTest.startMiniCluster to setup configuration
+ @BeforeClass
+ public static void startMiniCluster() throws Exception {
+ // Setup the configuration to start HBase cluster with the invalid list pruning enabled
+ conf = HBaseConfiguration.create();
+ conf.setBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, true);
+ // Flush prune data to table quickly, so that tests don't need have to wait long to see updates
+ conf.setLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL, 0L);
+ AbstractHBaseTableTest.startMiniCluster();
+
+ TransactionStateStorage txStateStorage = new InMemoryTransactionStateStorage();
+ TransactionManager txManager = new TransactionManager(conf, txStateStorage, new TxMetricsCollector());
+ txManager.startAndWait();
+
+ // Do some transactional data operations
+ txDataTable1 = TableName.valueOf("invalidListPruneTestTable1");
+ Table hTable = createTable(txDataTable1.getName(), new byte[][]{family}, false,
+ Collections.singletonList(TestTransactionProcessor.class.getName()));
+ try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, TxConstants.ConflictDetection.ROW)) {
+ TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
+ txContext.start();
+ for (int i = 0; i < MAX_ROWS; ++i) {
+ txTable.put(new Put(Bytes.toBytes(i)).addColumn(family, qualifier, Bytes.toBytes(i)));
+ }
+ txContext.finish();
+ }
+
+ testUtil.flush(txDataTable1);
+ txManager.stopAndWait();
+
+ pruneStateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
+ dataJanitorState =
+ new DataJanitorState(new DataJanitorState.TableSupplier() {
+ @Override
+ public Table get() throws IOException {
+ return testUtil.getConnection().getTable(pruneStateTable);
+ }
+ });
+
+ }
+
+ @AfterClass
+ public static void shutdownAfterClass() throws Exception {
+ hBaseAdmin.disableTable(txDataTable1);
+ hBaseAdmin.deleteTable(txDataTable1);
+ }
+
+ @Before
+ public void beforeTest() throws Exception {
+ createPruneStateTable();
+ InMemoryTransactionStateCache.setTransactionSnapshot(null);
+ }
+
+ private void createPruneStateTable() throws Exception {
+ Table table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY}, false,
+ // Prune state table is a non-transactional table, hence no transaction co-processor
+ Collections.<String>emptyList());
+ table.close();
+ }
+
+ @After
+ public void afterTest() throws Exception {
+ // Disable the data table so that prune writer thread gets stopped,
+ // this makes sure that any cached value will not interfere with next test
+ hBaseAdmin.disableTable(txDataTable1);
+ deletePruneStateTable();
+ // Enabling the table enables the prune writer thread again
+ hBaseAdmin.enableTable(txDataTable1);
+ }
+
+ private void deletePruneStateTable() throws Exception {
+ if (hBaseAdmin.tableExists(pruneStateTable)) {
+ hBaseAdmin.disableTable(pruneStateTable);
+ hBaseAdmin.deleteTable(pruneStateTable);
+ }
+ }
+
+ @Test
+ public void testRecordCompactionState() throws Exception {
+ // No prune upper bound initially
+ Assert.assertEquals(-1,
+ dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
+
+ // Create a new transaction snapshot
+ InMemoryTransactionStateCache.setTransactionSnapshot(
+ new TransactionSnapshot(100, 100, 100, ImmutableSet.of(50L),
+ ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
+ // Run minor compaction
+ testUtil.compact(txDataTable1, false);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
+ // No prune upper bound after minor compaction too
+ Assert.assertEquals(-1,
+ dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
+
+ // Run major compaction, and verify prune upper bound
+ testUtil.compact(txDataTable1, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
+ Assert.assertEquals(50,
+ dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
+
+ // Run major compaction again with same snapshot, prune upper bound should not change
+ testUtil.compact(txDataTable1, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
+ Assert.assertEquals(50,
+ dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
+
+ // Create a new transaction snapshot
+ InMemoryTransactionStateCache.setTransactionSnapshot(
+ new TransactionSnapshot(110, 111, 112, ImmutableSet.of(150L),
+ ImmutableSortedMap.of(105L, new TransactionManager.InProgressTx(
+ 100, 30, TransactionManager.InProgressType.SHORT))));
+ Assert.assertEquals(50,
+ dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
+
+ // Run major compaction again, now prune upper bound should change
+ testUtil.compact(txDataTable1, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
+ Assert.assertEquals(104,
+ dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
+ }
+
+ @Test
+ public void testRecordCompactionStateNoTable() throws Exception {
+ // Create a new transaction snapshot
+ InMemoryTransactionStateCache.setTransactionSnapshot(
+ new TransactionSnapshot(100, 100, 100, ImmutableSet.of(50L),
+ ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
+ // Run major compaction, and verify it completes
+ long now = System.currentTimeMillis();
+ testUtil.compact(txDataTable1, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
+ long lastMajorCompactionTime = TestTransactionProcessor.lastMajorCompactionTime.get();
+ Assert.assertTrue(String.format("Expected %d, but was %d", now, lastMajorCompactionTime),
+ lastMajorCompactionTime >= now);
+ }
+
+ @Test
+ public void testRecordCompactionStateNoTxSnapshot() throws Exception {
+ // Test recording state without having a transaction snapshot to make sure we don't disrupt
+ // major compaction in that case
+ InMemoryTransactionStateCache.setTransactionSnapshot(null);
+ // Run major compaction, and verify it completes
+ long now = System.currentTimeMillis();
+ testUtil.compact(txDataTable1, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
+ long lastMajorCompactionTime = TestTransactionProcessor.lastMajorCompactionTime.get();
+ Assert.assertTrue(String.format("Expected %d, but was %d", now, lastMajorCompactionTime),
+ lastMajorCompactionTime >= now);
+ }
+
+ @Test
+ public void testPruneUpperBound() throws Exception {
+ TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
+ transactionPruningPlugin.initialize(conf);
+
+ try {
+ // Run without a transaction snapshot first
+ long now1 = 200;
+ long inactiveTxTimeNow1 = 150 * TxConstants.MAX_TX_PER_MS;
+ long expectedPruneUpperBound1 = -1;
+ // fetch prune upper bound
+ long pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1);
+ Assert.assertEquals(expectedPruneUpperBound1, pruneUpperBound1);
+
+ TimeRegions expectedRegions1 =
+ new TimeRegions(now1,
+ ImmutableSortedSet.orderedBy(Bytes.BYTES_COMPARATOR)
+ .add(getRegionName(txDataTable1, Bytes.toBytes(0)))
+ .build());
+ // Assert prune state is recorded correctly
+ Assert.assertEquals(expectedRegions1, dataJanitorState.getRegionsOnOrBeforeTime(now1));
+ Assert.assertEquals(expectedPruneUpperBound1,
+ dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
+ Assert.assertEquals(inactiveTxTimeNow1, dataJanitorState.getInactiveTransactionBoundForTime(now1));
+
+ // Run prune complete
+ transactionPruningPlugin.pruneComplete(now1, expectedPruneUpperBound1);
+
+ // Assert prune state was cleaned up correctly based on the prune time
+ Assert.assertEquals(expectedRegions1, dataJanitorState.getRegionsOnOrBeforeTime(now1));
+ Assert.assertEquals(expectedPruneUpperBound1,
+ dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
+ Assert.assertEquals(inactiveTxTimeNow1, dataJanitorState.getInactiveTransactionBoundForTime(now1));
+
+ // Create a new transaction snapshot, and run major compaction on txDataTable1
+ // And run all assertions again
+ long now2 = 300;
+ long inactiveTxTimeNow2 = 250 * TxConstants.MAX_TX_PER_MS;
+ long expectedPruneUpperBound2 = 200 * TxConstants.MAX_TX_PER_MS;
+ InMemoryTransactionStateCache.setTransactionSnapshot(
+ new TransactionSnapshot(expectedPruneUpperBound2, expectedPruneUpperBound2, expectedPruneUpperBound2,
+ ImmutableSet.of(expectedPruneUpperBound2),
+ ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
+ TimeRegions expectedRegions2 =
+ new TimeRegions(now2,
+ ImmutableSortedSet.orderedBy(Bytes.BYTES_COMPARATOR)
+ .add(getRegionName(txDataTable1, Bytes.toBytes(0)))
+ .build());
+ testUtil.compact(txDataTable1, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
+ long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
+ Assert.assertEquals(expectedPruneUpperBound2, pruneUpperBound2);
+
+ Assert.assertEquals(expectedRegions2, dataJanitorState.getRegionsOnOrBeforeTime(now2));
+ Assert.assertEquals(expectedPruneUpperBound2,
+ dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
+ Assert.assertEquals(inactiveTxTimeNow2, dataJanitorState.getInactiveTransactionBoundForTime(now2));
+ Assert.assertEquals(expectedRegions1, dataJanitorState.getRegionsOnOrBeforeTime(now1));
+ Assert.assertEquals(inactiveTxTimeNow1, dataJanitorState.getInactiveTransactionBoundForTime(now1));
+
+ transactionPruningPlugin.pruneComplete(now2, pruneUpperBound2);
+ Assert.assertEquals(expectedRegions2, dataJanitorState.getRegionsOnOrBeforeTime(now2));
+ Assert.assertEquals(expectedPruneUpperBound2,
+ dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
+ Assert.assertEquals(inactiveTxTimeNow2, dataJanitorState.getInactiveTransactionBoundForTime(now2));
+ Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(now1));
+ Assert.assertEquals(expectedPruneUpperBound1, dataJanitorState.getInactiveTransactionBoundForTime(now1));
+
+ } finally {
+ transactionPruningPlugin.destroy();
+ }
+ }
+
+ @Test
+ public void testPruneEmptyTable() throws Exception {
+ // Make sure that empty tables do not block the progress of pruning
+
+ // Create an empty table
+ TableName txEmptyTable = TableName.valueOf("emptyPruneTestTable");
+ Table emptyHTable = createTable(txEmptyTable.getName(), new byte[][]{family}, false,
+ Collections.singletonList(TestTransactionProcessor.class.getName()));
+
+ TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
+ transactionPruningPlugin.initialize(conf);
+
+ try {
+ long now1 = System.currentTimeMillis();
+ long inactiveTxTimeNow1 = (now1 - 150) * TxConstants.MAX_TX_PER_MS;
+ long noPruneUpperBound = -1;
+ long expectedPruneUpperBound1 = (now1 - 200) * TxConstants.MAX_TX_PER_MS;
+ InMemoryTransactionStateCache.setTransactionSnapshot(
+ new TransactionSnapshot(expectedPruneUpperBound1, expectedPruneUpperBound1, expectedPruneUpperBound1,
+ ImmutableSet.of(expectedPruneUpperBound1),
+ ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
+ testUtil.compact(txEmptyTable, true);
+ testUtil.compact(txDataTable1, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
+
+ // fetch prune upper bound, there should be no prune upper bound since txEmptyTable cannot be compacted
+ long pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1);
+ Assert.assertEquals(noPruneUpperBound, pruneUpperBound1);
+ transactionPruningPlugin.pruneComplete(now1, noPruneUpperBound);
+
+ // Now flush the empty table, this will record the table region as empty, and then pruning will continue
+ hBaseAdmin.flush(txEmptyTable);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
+
+ // fetch prune upper bound, again, this time it should work
+ pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1);
+ Assert.assertEquals(expectedPruneUpperBound1, pruneUpperBound1);
+ transactionPruningPlugin.pruneComplete(now1, expectedPruneUpperBound1);
+
+ // Now add some data to the empty table
+ // (adding data non-transactionally is okay too, we just need some data for the compaction to run)
+ emptyHTable.put(new Put(Bytes.toBytes(1)).addColumn(family, qualifier, Bytes.toBytes(1)));
+ emptyHTable.close();
+
+ // Now run another compaction on txDataTable1 with an updated tx snapshot
+ long now2 = System.currentTimeMillis();
+ long inactiveTxTimeNow2 = (now2 - 150) * TxConstants.MAX_TX_PER_MS;
+ long expectedPruneUpperBound2 = (now2 - 200) * TxConstants.MAX_TX_PER_MS;
+ InMemoryTransactionStateCache.setTransactionSnapshot(
+ new TransactionSnapshot(expectedPruneUpperBound2, expectedPruneUpperBound2, expectedPruneUpperBound2,
+ ImmutableSet.of(expectedPruneUpperBound2),
+ ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
+ testUtil.flush(txEmptyTable);
+ testUtil.compact(txDataTable1, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
+
+ // Running a prune now should still return min(inactiveTxTimeNow1, expectedPruneUpperBound1) since
+ // txEmptyTable is no longer empty. This information is returned since the txEmptyTable was recorded as being
+ // empty in the previous run with inactiveTxTimeNow1
+ long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
+ Assert.assertEquals(inactiveTxTimeNow1, pruneUpperBound2);
+ transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound1);
+
+ // However, after compacting txEmptyTable we should get the latest upper bound
+ testUtil.flush(txEmptyTable);
+ testUtil.compact(txEmptyTable, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
+ pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
+ Assert.assertEquals(expectedPruneUpperBound2, pruneUpperBound2);
+ transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound2);
+ } finally {
+ transactionPruningPlugin.destroy();
+ hBaseAdmin.disableTable(txEmptyTable);
+ hBaseAdmin.deleteTable(txEmptyTable);
+ }
+ }
+
+ @Test
+ public void testPruneTransientTable() throws Exception {
+ // Make sure that transient tables do not block the progress of pruning
+
+ // Create a temp table
+ TableName txTempTable = TableName.valueOf("tempTable");
+ createTable(txTempTable.getName(), new byte[][]{family}, false,
+ Collections.singletonList(TestTransactionProcessor.class.getName()));
+
+ TableName txDataTable2 = null;
+
+ TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
+ transactionPruningPlugin.initialize(conf);
+
+ try {
+ long now1 = System.currentTimeMillis();
+ long inactiveTxTimeNow1 = (now1 - 150) * TxConstants.MAX_TX_PER_MS;
+ long noPruneUpperBound = -1;
+ long expectedPruneUpperBound1 = (now1 - 200) * TxConstants.MAX_TX_PER_MS;
+ InMemoryTransactionStateCache.setTransactionSnapshot(
+ new TransactionSnapshot(expectedPruneUpperBound1, expectedPruneUpperBound1, expectedPruneUpperBound1,
+ ImmutableSet.of(expectedPruneUpperBound1),
+ ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
+
+ // fetch prune upper bound, there should be no prune upper bound since nothing has been compacted yet.
+ // This run is only to store the initial set of regions
+ long pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1);
+ Assert.assertEquals(noPruneUpperBound, pruneUpperBound1);
+ transactionPruningPlugin.pruneComplete(now1, noPruneUpperBound);
+
+ // Now delete the transient table
+ hBaseAdmin.disableTable(txTempTable);
+ hBaseAdmin.deleteTable(txTempTable);
+ txTempTable = null;
+
+ // Compact the data table now
+ testUtil.compact(txDataTable1, true);
+ // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+ TimeUnit.SECONDS.sleep(2);
+
+ // Create a new table that will not be compacted
+ txDataTable2 = TableName.valueOf("invalidListPruneTestTable2");
+ createTable(txDataTable2.getName(), new byte[][]{family}, false,
+ Collections.singletonList(TestTransactionProcessor.class.getName()));
+
+ // fetch prune upper bound, there should be a prune upper bound even though txTempTable does not exist anymore,
+ // and txDataTable2 has not been compacted/flushed yet
+ long now2 = System.currentTimeMillis();
+ long inactiveTxTimeNow2 = (now1 - 150) * TxConstants.MAX_TX_PER_MS;
+ long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
+ Assert.assertEquals(expectedPruneUpperBound1, pruneUpperBound2);
+ transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound1);
+ } finally {
+ transactionPruningPlugin.destroy();
+ if (txDataTable2 != null) {
+ hBaseAdmin.disableTable(txDataTable2);
+ hBaseAdmin.deleteTable(txDataTable2);
+ }
+ if (txTempTable != null) {
+ hBaseAdmin.disableTable(txTempTable);
+ hBaseAdmin.deleteTable(txTempTable);
+ }
+ }
+ }
+
+ private byte[] getRegionName(TableName dataTable, byte[] row) throws IOException {
+ HRegionLocation regionLocation =
+ testUtil.getConnection().getRegionLocator(dataTable).getRegionLocation(row);
+ return regionLocation.getRegionInfo().getRegionName();
+ }
+
+ /**
+ * A transaction co-processor that uses in-memory {@link TransactionSnapshot} for testing
+ */
+ @SuppressWarnings("WeakerAccess")
+ public static class TestTransactionProcessor extends TransactionProcessor {
+ private static final AtomicLong lastMajorCompactionTime = new AtomicLong(-1);
+
+ @Override
+ protected CacheSupplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
+ return new CacheSupplier<TransactionStateCache>() {
+ @Override
+ public TransactionStateCache get() {
+ return new InMemoryTransactionStateCache();
+ }
+
+ @Override
+ public void release() {
+ // no-op
+ }
+ };
+ }
+
+ @Override
+ public void postCompact(
+ org.apache.hadoop.hbase.coprocessor.ObserverContext<RegionCoprocessorEnvironment> c,
+ Store store, StoreFile resultFile,
+ org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker tracker,
+ CompactionRequest request) throws IOException {
+ super.postCompact(c, store, resultFile, tracker, request);
+ lastMajorCompactionTime.set(System.currentTimeMillis());
+ }
+ }
+
+ /**
+ * Used to supply in-memory {@link TransactionSnapshot} to {@link TestTransactionProcessor} for testing
+ */
+ @SuppressWarnings("WeakerAccess")
+ public static class InMemoryTransactionStateCache extends TransactionStateCache {
+ private static TransactionVisibilityState transactionSnapshot;
+
+ public static void setTransactionSnapshot(TransactionVisibilityState transactionSnapshot) {
+ InMemoryTransactionStateCache.transactionSnapshot = transactionSnapshot;
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ // Nothing to do
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ // Nothing to do
+ }
+
+ @Override
+ public TransactionVisibilityState getLatestState() {
+ return transactionSnapshot;
+ }
+ }
+
+ @SuppressWarnings("WeakerAccess")
+ public static class TestTransactionPruningPlugin extends HBaseTransactionPruningPlugin {
+ @Override
+ protected boolean isTransactionalTable(HTableDescriptor tableDescriptor) {
+ return tableDescriptor.hasCoprocessor(TestTransactionProcessor.class.getName());
+ }
+ }
+}