You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by go...@apache.org on 2017/01/19 20:13:35 UTC
[2/5] incubator-tephra git commit: Porting Pruning changes to
hbase-compat-0.96, hbase-compat-0.98, hbase-compat-1.0, hbase-compat-1.0-cdh
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/abf34e5f/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
index b781a9a..0dc483d 100644
--- a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
+++ b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
@@ -19,21 +19,14 @@ package org.apache.tephra.hbase;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.OperationWithAttributes;
@@ -76,6 +69,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@@ -89,14 +83,11 @@ import static org.junit.Assert.fail;
/**
* Tests for TransactionAwareHTables.
*/
-public class TransactionAwareHTableTest {
+public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
private static final Logger LOG = LoggerFactory.getLogger(TransactionAwareHTableTest.class);
- private static HBaseTestingUtility testUtil;
- private static HBaseAdmin hBaseAdmin;
- private static TransactionStateStorage txStateStorage;
- private static TransactionManager txManager;
- private static Configuration conf;
+ static TransactionStateStorage txStateStorage;
+ static TransactionManager txManager;
private TransactionContext transactionContext;
private TransactionAwareHTable transactionAwareHTable;
private HTable hTable;
@@ -146,23 +137,6 @@ public class TransactionAwareHTableTest {
@BeforeClass
public static void setupBeforeClass() throws Exception {
- testUtil = new HBaseTestingUtility();
- conf = testUtil.getConfiguration();
-
- // Tune down the connection thread pool size
- conf.setInt("hbase.hconnection.threads.core", 5);
- conf.setInt("hbase.hconnection.threads.max", 10);
- // Tunn down handler threads in regionserver
- conf.setInt("hbase.regionserver.handler.count", 10);
-
- // Set to random port
- conf.setInt("hbase.master.port", 0);
- conf.setInt("hbase.master.info.port", 0);
- conf.setInt("hbase.regionserver.port", 0);
- conf.setInt("hbase.regionserver.info.port", 0);
-
- testUtil.startMiniCluster();
- hBaseAdmin = testUtil.getHBaseAdmin();
txStateStorage = new InMemoryTransactionStateStorage();
txManager = new TransactionManager(conf, txStateStorage, new TxMetricsCollector());
txManager.startAndWait();
@@ -170,8 +144,9 @@ public class TransactionAwareHTableTest {
@AfterClass
public static void shutdownAfterClass() throws Exception {
- testUtil.shutdownMiniCluster();
- hBaseAdmin.close();
+ if (txManager != null) {
+ txManager.stopAndWait();
+ }
}
@Before
@@ -187,34 +162,6 @@ public class TransactionAwareHTableTest {
hBaseAdmin.deleteTable(TestBytes.table);
}
- private HTable createTable(byte[] tableName, byte[][] columnFamilies) throws Exception {
- return createTable(tableName, columnFamilies, false, Collections.<String>emptyList());
- }
-
- private HTable createTable(byte[] tableName, byte[][] columnFamilies, boolean existingData,
- List<String> coprocessors) throws Exception {
- HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
- for (byte[] family : columnFamilies) {
- HColumnDescriptor columnDesc = new HColumnDescriptor(family);
- columnDesc.setMaxVersions(Integer.MAX_VALUE);
- columnDesc.setValue(TxConstants.PROPERTY_TTL, String.valueOf(100000)); // in millis
- desc.addFamily(columnDesc);
- }
- if (existingData) {
- desc.setValue(TxConstants.READ_NON_TX_DATA, "true");
- }
- // Divide individually to prevent any overflow
- int priority = Coprocessor.PRIORITY_USER;
- desc.addCoprocessor(TransactionProcessor.class.getName(), null, priority, null);
- // order in list is the same order that coprocessors will be invoked
- for (String coprocessor : coprocessors) {
- desc.addCoprocessor(coprocessor, null, ++priority, null);
- }
- hBaseAdmin.createTable(desc);
- testUtil.waitTableAvailable(tableName, 5000);
- return new HTable(testUtil.getConfiguration(), tableName);
- }
-
/**
* Test transactional put and get requests.
*
@@ -409,7 +356,7 @@ public class TransactionAwareHTableTest {
public void testAttributesPreserved() throws Exception {
HTable hTable = createTable(Bytes.toBytes("TestAttributesPreserved"),
new byte[][]{TestBytes.family, TestBytes.family2}, false,
- Lists.newArrayList(TestRegionObserver.class.getName()));
+ Lists.newArrayList(TransactionProcessor.class.getName(), TestRegionObserver.class.getName()));
try {
TransactionAwareHTable txTable = new TransactionAwareHTable(hTable);
TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
@@ -1123,7 +1070,7 @@ public class TransactionAwareHTableTest {
TransactionAwareHTable txTable =
new TransactionAwareHTable(createTable(Bytes.toBytes("testExistingData"), new byte[][]{TestBytes.family}, true,
- Collections.<String>emptyList()));
+ Collections.singletonList(TransactionProcessor.class.getName())));
TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
// Add some pre-existing, non-transactional data
@@ -1272,8 +1219,9 @@ public class TransactionAwareHTableTest {
@Test
public void testVisibilityAll() throws Exception {
- HTable nonTxTable = createTable(Bytes.toBytes("testVisibilityAll"),
- new byte[][]{TestBytes.family, TestBytes.family2}, true, Collections.<String>emptyList());
+ HTable nonTxTable =
+ createTable(Bytes.toBytes("testVisibilityAll"), new byte[][]{TestBytes.family, TestBytes.family2},
+ true, Collections.singletonList(TransactionProcessor.class.getName()));
TransactionAwareHTable txTable =
new TransactionAwareHTable(nonTxTable,
TxConstants.ConflictDetection.ROW); // ROW conflict detection to verify family deletes
@@ -1549,6 +1497,66 @@ public class TransactionAwareHTableTest {
transactionContext.finish();
}
+ @Test
+ public void testTxLifetime() throws Exception {
+ // Add some initial values
+ transactionContext.start();
+ Put put = new Put(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier, TestBytes.value);
+ transactionAwareHTable.put(put);
+ put = new Put(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2, TestBytes.value);
+ transactionAwareHTable.put(put);
+ transactionContext.finish();
+
+ // Simulate writing with a transaction past its max lifetime
+ transactionContext.start();
+ Transaction currentTx = transactionContext.getCurrentTransaction();
+ Assert.assertNotNull(currentTx);
+
+ // Create a transaction that is past the max lifetime
+ long txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME,
+ TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME));
+ long oldTxId = currentTx.getTransactionId() - ((txMaxLifetimeMillis + 10000) * TxConstants.MAX_TX_PER_MS);
+ Transaction oldTx = new Transaction(currentTx.getReadPointer(), oldTxId,
+ currentTx.getInvalids(), currentTx.getInProgress(),
+ currentTx.getFirstShortInProgress());
+ transactionAwareHTable.updateTx(oldTx);
+ // Put with the old transaction should fail
+ put = new Put(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier, TestBytes.value);
+ try {
+ transactionAwareHTable.put(put);
+ Assert.fail("Excepted exception with old transaction!");
+ } catch (IOException e) {
+ // Expected exception
+ }
+
+ // Delete with the old transaction should also fail
+ Delete delete = new Delete(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier);
+ try {
+ transactionAwareHTable.delete(delete);
+ Assert.fail("Excepted exception with old transaction!");
+ } catch (IOException e) {
+ // Expected exception
+ }
+
+ // Now update the table to use the current transaction
+ transactionAwareHTable.updateTx(currentTx);
+ put = new Put(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier2, TestBytes.value);
+ transactionAwareHTable.put(put);
+ delete = new Delete(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2);
+ transactionAwareHTable.delete(delete);
+
+ // Verify values with the same transaction since we cannot commit the old transaction
+ verifyRow(transactionAwareHTable,
+ new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier), TestBytes.value);
+ verifyRow(transactionAwareHTable,
+ new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2), null);
+ verifyRow(transactionAwareHTable,
+ new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier), null);
+ verifyRow(transactionAwareHTable,
+ new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier2), TestBytes.value);
+ transactionContext.finish();
+ }
+
/**
* Tests that transaction co-processor works with older clients
*
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/abf34e5f/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
new file mode 100644
index 0000000..402892f
--- /dev/null
+++ b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.ImmutableSortedSet;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.hbase.AbstractHBaseTableTest;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+/**
+ * Test methods of {@link DataJanitorState}
+ */
+// TODO: Group all the tests that need HBase mini cluster into a suite, so that we start the mini-cluster only once
+public class DataJanitorStateTest extends AbstractHBaseTableTest {
+
+ private TableName pruneStateTable;
+ private DataJanitorState dataJanitorState;
+
+ @Before
+ public void beforeTest() throws Exception {
+ pruneStateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
+ HTable table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY}, false,
+ // Prune state table is a non-transactional table, hence no transaction co-processor
+ Collections.<String>emptyList());
+ table.close();
+
+ dataJanitorState =
+ new DataJanitorState(new DataJanitorState.TableSupplier() {
+ @Override
+ public Table get() throws IOException {
+ return testUtil.getConnection().getTable(pruneStateTable);
+ }
+ });
+
+ }
+
+ @After
+ public void afterTest() throws Exception {
+ hBaseAdmin.disableTable(pruneStateTable);
+ hBaseAdmin.deleteTable(pruneStateTable);
+ }
+
+ @Test
+ public void testSavePruneUpperBound() throws Exception {
+ int max = 20;
+
+ // Nothing should be present in the beginning
+ Assert.assertEquals(-1, dataJanitorState.getPruneUpperBoundForRegion(Bytes.toBytes(10L)));
+
+ // Save some region - prune upper bound values
+ // We should have values for regions 0, 2, 4, 6, ..., max-2 after this
+ for (long i = 0; i < max; i += 2) {
+ dataJanitorState.savePruneUpperBoundForRegion(Bytes.toBytes(i), i);
+ }
+
+ Assert.assertEquals(10L, dataJanitorState.getPruneUpperBoundForRegion(Bytes.toBytes(10L)));
+
+ // Verify all the saved values
+ for (long i = 0; i < max; ++i) {
+ long expected = i % 2 == 0 ? i : -1;
+ Assert.assertEquals(expected, dataJanitorState.getPruneUpperBoundForRegion(Bytes.toBytes(i)));
+ }
+ // Regions not present should give -1
+ Assert.assertEquals(-1, dataJanitorState.getPruneUpperBoundForRegion(Bytes.toBytes(max + 50L)));
+ Assert.assertEquals(-1, dataJanitorState.getPruneUpperBoundForRegion(Bytes.toBytes((max + 10L) * -1)));
+ Assert.assertEquals(-1, dataJanitorState.getPruneUpperBoundForRegion(Bytes.toBytes(3L)));
+
+ SortedSet<byte[]> allRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+ Map<byte[], Long> expectedMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ for (long i = 0; i < max; ++i) {
+ allRegions.add(Bytes.toBytes(i));
+ if (i % 2 == 0) {
+ expectedMap.put(Bytes.toBytes(i), i);
+ }
+ }
+ Assert.assertEquals(max / 2, expectedMap.size());
+ Assert.assertEquals(expectedMap, dataJanitorState.getPruneUpperBoundForRegions(allRegions));
+
+ SortedSet<byte[]> regions = ImmutableSortedSet.orderedBy(Bytes.BYTES_COMPARATOR)
+ .add(Bytes.toBytes((max + 20L) * -1))
+ .add(Bytes.toBytes(6L))
+ .add(Bytes.toBytes(15L))
+ .add(Bytes.toBytes(18L))
+ .add(Bytes.toBytes(max + 33L))
+ .build();
+ expectedMap = ImmutableSortedMap.<byte[], Long>orderedBy(Bytes.BYTES_COMPARATOR)
+ .put(Bytes.toBytes(6L), 6L)
+ .put(Bytes.toBytes(18L), 18L)
+ .build();
+ Assert.assertEquals(expectedMap, dataJanitorState.getPruneUpperBoundForRegions(regions));
+
+ // Delete regions that have prune upper bound before 15 and not in set (4, 8)
+ ImmutableSortedSet<byte[]> excludeRegions =
+ ImmutableSortedSet.orderedBy(Bytes.BYTES_COMPARATOR).add(Bytes.toBytes(4L)).add(Bytes.toBytes(8L)).build();
+ dataJanitorState.deletePruneUpperBounds(15, excludeRegions);
+ // Regions 0, 2, 6 and 10 should have been deleted now
+ expectedMap = ImmutableSortedMap.<byte[], Long>orderedBy(Bytes.BYTES_COMPARATOR)
+ .put(Bytes.toBytes(4L), 4L)
+ .put(Bytes.toBytes(8L), 8L)
+ .put(Bytes.toBytes(16L), 16L)
+ .put(Bytes.toBytes(18L), 18L)
+ .build();
+ Assert.assertEquals(expectedMap, dataJanitorState.getPruneUpperBoundForRegions(allRegions));
+ }
+
+ @Test
+ public void testSaveRegionTime() throws Exception {
+ int maxTime = 100;
+
+ // Nothing should be present in the beginning
+ Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(maxTime));
+
+ // Save regions for time
+ Map<Long, SortedSet<byte[]>> regionsTime = new TreeMap<>();
+ for (long time = 0; time < maxTime; time += 10) {
+ SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+ for (long region = 0; region < 10; region += 2) {
+ regions.add(Bytes.toBytes((time * 10) + region));
+ }
+ regionsTime.put(time, regions);
+ dataJanitorState.saveRegionsForTime(time, regions);
+ }
+
+ // Verify saved regions
+ Assert.assertEquals(new TimeRegions(30, regionsTime.get(30L)), dataJanitorState.getRegionsOnOrBeforeTime(30));
+ Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(25));
+ Assert.assertEquals(new TimeRegions(30, regionsTime.get(30L)), dataJanitorState.getRegionsOnOrBeforeTime(31));
+ Assert.assertEquals(new TimeRegions(90, regionsTime.get(90L)),
+ dataJanitorState.getRegionsOnOrBeforeTime(maxTime + 1000));
+ Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(-10));
+
+ // Delete regions saved on or before time 30
+ dataJanitorState.deleteAllRegionsOnOrBeforeTime(30);
+ // Values on or before time 30 should be deleted
+ Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(30));
+ Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(25));
+ // Values after time 30 should still exist
+ Assert.assertEquals(new TimeRegions(40, regionsTime.get(40L)), dataJanitorState.getRegionsOnOrBeforeTime(40));
+ }
+
+ @Test
+ public void testSaveInactiveTransactionBoundTime() throws Exception {
+ int maxTime = 100;
+
+ // Nothing sould be present in the beginning
+ Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(10));
+
+ // Save inactive transaction bounds for various time values
+ for (long time = 0; time < maxTime; time += 10) {
+ dataJanitorState.saveInactiveTransactionBoundForTime(time, time + 2);
+ }
+
+ // Verify written values
+ Assert.assertEquals(2, dataJanitorState.getInactiveTransactionBoundForTime(0));
+ Assert.assertEquals(12, dataJanitorState.getInactiveTransactionBoundForTime(10));
+ Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(15));
+ Assert.assertEquals(92, dataJanitorState.getInactiveTransactionBoundForTime(90));
+ Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(maxTime + 100));
+ Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime((maxTime + 55) * -1L));
+
+ // Delete values saved on or before time 20
+ dataJanitorState.deleteInactiveTransactionBoundsOnOrBeforeTime(20);
+ // Values on or before time 20 should be deleted
+ Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(0));
+ Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(10));
+ Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(20));
+ // Values after time 20 should still exist
+ Assert.assertEquals(32, dataJanitorState.getInactiveTransactionBoundForTime(30));
+ Assert.assertEquals(92, dataJanitorState.getInactiveTransactionBoundForTime(90));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/abf34e5f/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
new file mode 100644
index 0000000..310c710
--- /dev/null
+++ b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.ImmutableSortedSet;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TransactionContext;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.coprocessor.TransactionStateCache;
+import org.apache.tephra.hbase.AbstractHBaseTableTest;
+import org.apache.tephra.hbase.TransactionAwareHTable;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.tephra.inmemory.InMemoryTxSystemClient;
+import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.persist.InMemoryTransactionStateStorage;
+import org.apache.tephra.persist.TransactionSnapshot;
+import org.apache.tephra.persist.TransactionStateStorage;
+import org.apache.tephra.persist.TransactionVisibilityState;
+import org.apache.tephra.txprune.TransactionPruningPlugin;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Test invalid list pruning
+ */
+public class InvalidListPruneTest extends AbstractHBaseTableTest {
+ private static final byte[] family = Bytes.toBytes("f1");
+ private static final byte[] qualifier = Bytes.toBytes("col1");
+ private static final int MAX_ROWS = 1000;
+
+ private static TableName txDataTable1;
+ private static TableName pruneStateTable;
+
+ // Override AbstractHBaseTableTest.startMiniCluster to setup configuration
+ @BeforeClass
+ public static void startMiniCluster() throws Exception {
+ // Setup the configuration to start HBase cluster with the invalid list pruning enabled
+ conf = HBaseConfiguration.create();
+ conf.setBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, true);
+ AbstractHBaseTableTest.startMiniCluster();
+
+ TransactionStateStorage txStateStorage = new InMemoryTransactionStateStorage();
+ TransactionManager txManager = new TransactionManager(conf, txStateStorage, new TxMetricsCollector());
+ txManager.startAndWait();
+
+ // Do some transactional data operations
+ txDataTable1 = TableName.valueOf("invalidListPruneTestTable1");
+ HTable hTable = createTable(txDataTable1.getName(), new byte[][]{family}, false,
+ Collections.singletonList(TestTransactionProcessor.class.getName()));
+ try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, TxConstants.ConflictDetection.ROW)) {
+ TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
+ txContext.start();
+ for (int i = 0; i < MAX_ROWS; ++i) {
+ txTable.put(new Put(Bytes.toBytes(i)).addColumn(family, qualifier, Bytes.toBytes(i)));
+ }
+ txContext.finish();
+ }
+
+ testUtil.flush(txDataTable1);
+ txManager.stopAndWait();
+
+ pruneStateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
+ }
+
+ @AfterClass
+ public static void shutdownAfterClass() throws Exception {
+ hBaseAdmin.disableTable(txDataTable1);
+ hBaseAdmin.deleteTable(txDataTable1);
+ }
+
+ @Before
+ public void beforeTest() throws Exception {
+ createPruneStateTable();
+ InMemoryTransactionStateCache.setTransactionSnapshot(null);
+ }
+
+ private void createPruneStateTable() throws Exception {
+ HTable table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY}, false,
+ // Prune state table is a non-transactional table, hence no transaction co-processor
+ Collections.<String>emptyList());
+ table.close();
+ }
+
+ @After
+ public void afterTest() throws Exception {
+ deletePruneStateTable();
+ }
+
+ private void deletePruneStateTable() throws Exception {
+ if (hBaseAdmin.tableExists(pruneStateTable)) {
+ hBaseAdmin.disableTable(pruneStateTable);
+ hBaseAdmin.deleteTable(pruneStateTable);
+ }
+ }
+
+ @Test
+ public void testRecordCompactionState() throws Exception {
+ DataJanitorState dataJanitorState =
+ new DataJanitorState(new DataJanitorState.TableSupplier() {
+ @Override
+ public Table get() throws IOException {
+ return testUtil.getConnection().getTable(pruneStateTable);
+ }
+ });
+
+ // No prune upper bound initially
+ Assert.assertEquals(-1,
+ dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
+
+ // Create a new transaction snapshot
+ InMemoryTransactionStateCache.setTransactionSnapshot(
+ new TransactionSnapshot(100, 100, 100, ImmutableSet.of(50L),
+ ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
+ // Run minor compaction
+ testUtil.compact(txDataTable1, false);
+ // No prune upper bound after minor compaction too
+ Assert.assertEquals(-1,
+ dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
+
+ // Run major compaction, and verify prune upper bound
+ testUtil.compact(txDataTable1, true);
+ Assert.assertEquals(50,
+ dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
+
+ // Run major compaction again with same snapshot, prune upper bound should not change
+ testUtil.compact(txDataTable1, true);
+ Assert.assertEquals(50,
+ dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
+
+ // Create a new transaction snapshot
+ InMemoryTransactionStateCache.setTransactionSnapshot(
+ new TransactionSnapshot(110, 111, 112, ImmutableSet.of(150L),
+ ImmutableSortedMap.of(105L, new TransactionManager.InProgressTx(
+ 100, 30, TransactionManager.InProgressType.SHORT))));
+ Assert.assertEquals(50,
+ dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
+
+ // Run major compaction again, now prune upper bound should change
+ testUtil.compact(txDataTable1, true);
+ Assert.assertEquals(104,
+ dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
+ }
+
+ @Test
+ public void testRecordCompactionStateNoTable() throws Exception {
+ // To make sure we don't disrupt major compaction prune state table is not present, delete the prune state table
+ // and make sure a major compaction succeeds
+ deletePruneStateTable();
+
+ // Create a new transaction snapshot
+ InMemoryTransactionStateCache.setTransactionSnapshot(
+ new TransactionSnapshot(100, 100, 100, ImmutableSet.of(50L),
+ ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
+ // Run major compaction, and verify it completes
+ long now = System.currentTimeMillis();
+ testUtil.compact(txDataTable1, true);
+ long lastMajorCompactionTime = TestTransactionProcessor.lastMajorCompactionTime.get();
+ Assert.assertTrue(String.format("Expected %d, but was %d", now, lastMajorCompactionTime),
+ lastMajorCompactionTime >= now);
+ }
+
+ @Test
+ public void testRecordCompactionStateNoTxSnapshot() throws Exception {
+ // Test recording state without having a transaction snapshot to make sure we don't disrupt
+ // major compaction in that case
+ InMemoryTransactionStateCache.setTransactionSnapshot(null);
+ // Run major compaction, and verify it completes
+ long now = System.currentTimeMillis();
+ testUtil.compact(txDataTable1, true);
+ long lastMajorCompactionTime = TestTransactionProcessor.lastMajorCompactionTime.get();
+ Assert.assertTrue(String.format("Expected %d, but was %d", now, lastMajorCompactionTime),
+ lastMajorCompactionTime >= now);
+ }
+
+ @Test
+ public void testPruneUpperBound() throws Exception {
+ DataJanitorState dataJanitorState =
+ new DataJanitorState(new DataJanitorState.TableSupplier() {
+ @Override
+ public Table get() throws IOException {
+ return testUtil.getConnection().getTable(pruneStateTable);
+ }
+ });
+
+ TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
+ transactionPruningPlugin.initialize(conf);
+ try {
+ // Run without a transaction snapshot first
+ long now1 = 200;
+ long inactiveTxTimeNow1 = 150 * TxConstants.MAX_TX_PER_MS;
+ long expectedPruneUpperBound1 = -1;
+ // fetch prune upper bound
+ long pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1);
+ Assert.assertEquals(expectedPruneUpperBound1, pruneUpperBound1);
+
+ TimeRegions expectedRegions1 =
+ new TimeRegions(now1,
+ ImmutableSortedSet.orderedBy(Bytes.BYTES_COMPARATOR)
+ .add(getRegionName(txDataTable1, Bytes.toBytes(0)))
+ .build());
+ // Assert prune state is recorded correctly
+ Assert.assertEquals(expectedRegions1, dataJanitorState.getRegionsOnOrBeforeTime(now1));
+ Assert.assertEquals(expectedPruneUpperBound1,
+ dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
+ Assert.assertEquals(inactiveTxTimeNow1, dataJanitorState.getInactiveTransactionBoundForTime(now1));
+
+ // Run prune complete
+ transactionPruningPlugin.pruneComplete(now1, expectedPruneUpperBound1);
+
+ // Assert prune state was cleaned up correctly based on the prune time
+ Assert.assertEquals(expectedRegions1, dataJanitorState.getRegionsOnOrBeforeTime(now1));
+ Assert.assertEquals(expectedPruneUpperBound1,
+ dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
+ Assert.assertEquals(inactiveTxTimeNow1, dataJanitorState.getInactiveTransactionBoundForTime(now1));
+
+ // Create a new transaction snapshot, and run major compaction on txDataTable1
+ // And run all assertions again
+ long now2 = 300;
+ long inactiveTxTimeNow2 = 250 * TxConstants.MAX_TX_PER_MS;
+ long expectedPruneUpperBound2 = 200 * TxConstants.MAX_TX_PER_MS;
+ InMemoryTransactionStateCache.setTransactionSnapshot(
+ new TransactionSnapshot(expectedPruneUpperBound2, expectedPruneUpperBound2, expectedPruneUpperBound2,
+ ImmutableSet.of(expectedPruneUpperBound2),
+ ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
+ TimeRegions expectedRegions2 =
+ new TimeRegions(now2,
+ ImmutableSortedSet.orderedBy(Bytes.BYTES_COMPARATOR)
+ .add(getRegionName(txDataTable1, Bytes.toBytes(0)))
+ .build());
+ testUtil.compact(txDataTable1, true);
+ long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
+ Assert.assertEquals(expectedPruneUpperBound2, pruneUpperBound2);
+
+ Assert.assertEquals(expectedRegions2, dataJanitorState.getRegionsOnOrBeforeTime(now2));
+ Assert.assertEquals(expectedPruneUpperBound2,
+ dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
+ Assert.assertEquals(inactiveTxTimeNow2, dataJanitorState.getInactiveTransactionBoundForTime(now2));
+ Assert.assertEquals(expectedRegions1, dataJanitorState.getRegionsOnOrBeforeTime(now1));
+ Assert.assertEquals(inactiveTxTimeNow1, dataJanitorState.getInactiveTransactionBoundForTime(now1));
+
+ transactionPruningPlugin.pruneComplete(now2, pruneUpperBound2);
+ Assert.assertEquals(expectedRegions2, dataJanitorState.getRegionsOnOrBeforeTime(now2));
+ Assert.assertEquals(expectedPruneUpperBound2,
+ dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
+ Assert.assertEquals(inactiveTxTimeNow2, dataJanitorState.getInactiveTransactionBoundForTime(now2));
+ Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(now1));
+ Assert.assertEquals(expectedPruneUpperBound1, dataJanitorState.getInactiveTransactionBoundForTime(now1));
+
+ } finally {
+ transactionPruningPlugin.destroy();
+ }
+ }
+
+ private byte[] getRegionName(TableName dataTable, byte[] row) throws IOException {
+ HRegionLocation regionLocation =
+ testUtil.getConnection().getRegionLocator(dataTable).getRegionLocation(row);
+ return regionLocation.getRegionInfo().getRegionName();
+ }
+
+ /**
+ * A transaction co-processor that uses in-memory {@link TransactionSnapshot} for testing
+ */
+ @SuppressWarnings("WeakerAccess")
+ public static class TestTransactionProcessor extends TransactionProcessor {
+ private static final AtomicLong lastMajorCompactionTime = new AtomicLong(-1);
+
+ @Override
+ protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
+ return new Supplier<TransactionStateCache>() {
+ @Override
+ public TransactionStateCache get() {
+ return new InMemoryTransactionStateCache();
+ }
+ };
+ }
+
+ @Override
+ public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, StoreFile resultFile,
+ CompactionRequest request) throws IOException {
+ super.postCompact(e, store, resultFile, request);
+ lastMajorCompactionTime.set(System.currentTimeMillis());
+ }
+ }
+
+ /**
+ * Used to supply in-memory {@link TransactionSnapshot} to {@link TestTransactionProcessor} for testing
+ */
+ @SuppressWarnings("WeakerAccess")
+ public static class InMemoryTransactionStateCache extends TransactionStateCache {
+ private static TransactionVisibilityState transactionSnapshot;
+
+ public static void setTransactionSnapshot(TransactionVisibilityState transactionSnapshot) {
+ InMemoryTransactionStateCache.transactionSnapshot = transactionSnapshot;
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ // Nothing to do
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ // Nothing to do
+ }
+
+ @Override
+ public TransactionVisibilityState getLatestState() {
+ return transactionSnapshot;
+ }
+ }
+
+ @SuppressWarnings("WeakerAccess")
+ public static class TestTransactionPruningPlugin extends HBaseTransactionPruningPlugin {
+ @Override
+ protected boolean isTransactionalTable(HTableDescriptor tableDescriptor) {
+ return tableDescriptor.hasCoprocessor(TestTransactionProcessor.class.getName());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/abf34e5f/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
index e40cf76..6b266a2 100644
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
@@ -649,6 +649,7 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
txDelete.setAttribute(entry.getKey(), entry.getValue());
}
txDelete.setDurability(delete.getDurability());
+ addToOperation(txDelete, tx);
return txDelete;
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/abf34e5f/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 40f8949..744fa4c 100644
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -26,9 +26,11 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
@@ -45,6 +47,7 @@ import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -54,6 +57,7 @@ import org.apache.tephra.TransactionCodec;
import org.apache.tephra.TxConstants;
import org.apache.tephra.coprocessor.TransactionStateCache;
import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
+import org.apache.tephra.hbase.txprune.CompactionState;
import org.apache.tephra.persist.TransactionVisibilityState;
import org.apache.tephra.util.TxUtils;
@@ -64,6 +68,8 @@ import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
/**
* {@code org.apache.hadoop.hbase.coprocessor.RegionObserver} coprocessor that handles server-side processing
@@ -97,11 +103,13 @@ import java.util.Set;
public class TransactionProcessor extends BaseRegionObserver {
private static final Log LOG = LogFactory.getLog(TransactionProcessor.class);
- private TransactionStateCache cache;
private final TransactionCodec txCodec;
+ private TransactionStateCache cache;
+ private CompactionState compactionState;
protected Map<byte[], Long> ttlByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
protected boolean allowEmptyValues = TxConstants.ALLOW_EMPTY_VALUES_DEFAULT;
protected boolean readNonTxnData = TxConstants.DEFAULT_READ_NON_TX_DATA;
+ protected long txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME);
public TransactionProcessor() {
this.txCodec = new TransactionCodec();
@@ -138,6 +146,20 @@ public class TransactionProcessor extends BaseRegionObserver {
if (readNonTxnData) {
LOG.info("Reading pre-existing data enabled for table " + tableDesc.getNameAsString());
}
+
+ this.txMaxLifetimeMillis =
+ TimeUnit.SECONDS.toMillis(env.getConfiguration().getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME,
+ TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME));
+
+ boolean pruneEnabled = env.getConfiguration().getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
+ if (pruneEnabled) {
+ String pruneTable = env.getConfiguration().get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
+ compactionState = new CompactionState(env, TableName.valueOf(pruneTable), txMaxLifetimeMillis);
+ LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " +
+ pruneTable);
+ }
}
}
@@ -165,6 +187,13 @@ public class TransactionProcessor extends BaseRegionObserver {
}
@Override
+ public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability)
+ throws IOException {
+ Transaction tx = getFromOperation(put);
+ ensureValidTxLifetime(tx);
+ }
+
+ @Override
public void preDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit,
Durability durability) throws IOException {
// Translate deletes into our own delete tombstones
@@ -177,6 +206,9 @@ public class TransactionProcessor extends BaseRegionObserver {
return;
}
+ Transaction tx = getFromOperation(delete);
+ ensureValidTxLifetime(tx);
+
// Other deletes are client-initiated and need to be translated into our own tombstones
// TODO: this should delegate to the DeleteStrategy implementation.
Put deleteMarkers = new Put(delete.getRow(), delete.getTimeStamp());
@@ -269,8 +301,24 @@ public class TransactionProcessor extends BaseRegionObserver {
List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s,
CompactionRequest request)
throws IOException {
- return createStoreScanner(c.getEnvironment(), "compaction", cache.getLatestState(), store, scanners,
- scanType, earliestPutTs);
+ // Get the latest tx snapshot state for the compaction
+ TransactionVisibilityState snapshot = cache.getLatestState();
+
+ // Record tx state before the compaction
+ if (compactionState != null) {
+ compactionState.record(request, snapshot);
+ }
+ // Also make sure to use the same snapshot for the compaction
+ return createStoreScanner(c.getEnvironment(), "compaction", snapshot, store, scanners, scanType, earliestPutTs);
+ }
+
+ @Override
+ public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, StoreFile resultFile,
+ CompactionRequest request) throws IOException {
+ // Persist the compaction state after a succesful compaction
+ if (compactionState != null) {
+ compactionState.persist();
+ }
}
protected InternalScanner createStoreScanner(RegionCoprocessorEnvironment env, String action,
@@ -311,6 +359,19 @@ public class TransactionProcessor extends BaseRegionObserver {
return null;
}
+ private void ensureValidTxLifetime(@Nullable Transaction tx) throws DoNotRetryIOException {
+ if (tx == null) {
+ return;
+ }
+
+ boolean validLifetime =
+ TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis > System.currentTimeMillis();
+ if (!validLifetime) {
+ throw new DoNotRetryIOException(String.format("Transaction %s has exceeded max lifetime %s ms",
+ tx.getTransactionId(), txMaxLifetimeMillis));
+ }
+ }
+
private boolean isRollbackOperation(OperationWithAttributes op) throws IOException {
return op.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null ||
// to support old clients
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/abf34e5f/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
new file mode 100644
index 0000000..2e61275
--- /dev/null
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.persist.TransactionVisibilityState;
+import org.apache.tephra.util.TxUtils;
+
+import java.io.IOException;
+import javax.annotation.Nullable;
+
+/**
+ * Record compaction state for invalid list pruning
+ */
+public class CompactionState {
+ private static final Log LOG = LogFactory.getLog(CompactionState.class);
+
+ private final byte[] regionName;
+ private final String regionNameAsString;
+ private final TableName stateTable;
+ private final long txMaxLifetimeMills;
+ private final DataJanitorState dataJanitorState;
+ private volatile long pruneUpperBound = -1;
+
+ public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable, long txMaxLifetimeMills) {
+ this.regionName = env.getRegionInfo().getRegionName();
+ this.regionNameAsString = env.getRegionInfo().getRegionNameAsString();
+ this.stateTable = stateTable;
+ this.txMaxLifetimeMills = txMaxLifetimeMills;
+ this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
+ @Override
+ public Table get() throws IOException {
+ return env.getTable(stateTable);
+ }
+ });
+ }
+
+ /**
+ * Records the transaction state used for a compaction. This method is called when the compaction starts.
+ *
+ * @param request {@link CompactionRequest} for the compaction
+ * @param snapshot transaction state that will be used for the compaction
+ */
+ public void record(CompactionRequest request, @Nullable TransactionVisibilityState snapshot) {
+ if (request.isMajor() && snapshot != null) {
+ Transaction tx = TxUtils.createDummyTransaction(snapshot);
+ pruneUpperBound = TxUtils.getPruneUpperBound(tx);
+ LOG.debug(
+ String.format("Computed prune upper bound %s for compaction request %s using transaction state from time %s",
+ pruneUpperBound, request, snapshot.getTimestamp()));
+ } else {
+ pruneUpperBound = -1;
+ }
+ }
+
+ /**
+ * Persists the transaction state recorded by {@link #record(CompactionRequest, TransactionVisibilityState)}.
+ * This method is called after the compaction has successfully completed.
+ */
+ public void persist() {
+ if (pruneUpperBound != -1) {
+ try {
+ dataJanitorState.savePruneUpperBoundForRegion(regionName, pruneUpperBound);
+ LOG.debug(String.format("Saved prune upper bound %s for region %s", pruneUpperBound, regionNameAsString));
+ } catch (IOException e) {
+ LOG.warn(String.format("Cannot record prune upper bound in table %s after compacting region %s",
+ stateTable, regionNameAsString), e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/abf34e5f/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
new file mode 100644
index 0000000..c6d03c4
--- /dev/null
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import javax.annotation.Nullable;
+
+/**
+ * Persist data janitor state into an HBase table.
+ * This is used by both {@link TransactionProcessor} and by the {@link HBaseTransactionPruningPlugin}
+ * to persist and read the compaction state.
+ */
+@SuppressWarnings("WeakerAccess")
+public class DataJanitorState {
+ public static final byte[] FAMILY = {'f'};
+
+ private static final byte[] PRUNE_UPPER_BOUND_COL = {'p'};
+ private static final byte[] REGION_TIME_COL = {'r'};
+ private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_COL = {'i'};
+
+ private static final byte[] REGION_KEY_PREFIX = {0x1};
+ private static final byte[] REGION_KEY_PREFIX_STOP = {0x2};
+
+ private static final byte[] REGION_TIME_KEY_PREFIX = {0x2};
+ private static final byte[] REGION_TIME_KEY_PREFIX_STOP = {0x3};
+
+ private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX = {0x3};
+ private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX_STOP = {0x4};
+
+ private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
+
+ private final TableSupplier stateTableSupplier;
+
+
+ public DataJanitorState(TableSupplier stateTableSupplier) {
+ this.stateTableSupplier = stateTableSupplier;
+ }
+
+ // ----------------------------------------------------------------
+ // ------- Methods for prune upper bound for a given region -------
+ // ----------------------------------------------------------------
+ // The data is stored in the following format -
+ // Key: 0x1<region-id>
+ // Col 'u': <prune upper bound>
+ // ----------------------------------------------------------------
+
+ /**
+ * Persist the latest prune upper bound for a given region. This is called by {@link TransactionProcessor}
+ * after major compaction.
+ *
+ * @param regionId region id
+ * @param pruneUpperBound the latest prune upper bound for the region
+ * @throws IOException when not able to persist the data to HBase
+ */
+ public void savePruneUpperBoundForRegion(byte[] regionId, long pruneUpperBound) throws IOException {
+ try (Table stateTable = stateTableSupplier.get()) {
+ Put put = new Put(makeRegionKey(regionId));
+ put.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL, Bytes.toBytes(pruneUpperBound));
+ stateTable.put(put);
+ }
+ }
+
+ /**
+ * Get latest prune upper bound for a given region. This indicates the largest invalid transaction that no
+ * longer has writes in this region.
+ *
+ * @param regionId region id
+ * @return latest prune upper bound for the region
+ * @throws IOException when not able to read the data from HBase
+ */
+ public long getPruneUpperBoundForRegion(byte[] regionId) throws IOException {
+ try (Table stateTable = stateTableSupplier.get()) {
+ Get get = new Get(makeRegionKey(regionId));
+ get.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+ byte[] result = stateTable.get(get).getValue(FAMILY, PRUNE_UPPER_BOUND_COL);
+ return result == null ? -1 : Bytes.toLong(result);
+ }
+ }
+
+ /**
+ * Get latest prune upper bounds for given regions. This is a batch operation of method
+ * {@link #getPruneUpperBoundForRegion(byte[])}
+ *
+ * @param regions a set of regions
+ * @return a map containing region id and its latest prune upper bound value
+ * @throws IOException when not able to read the data from HBase
+ */
+ public Map<byte[], Long> getPruneUpperBoundForRegions(SortedSet<byte[]> regions) throws IOException {
+ Map<byte[], Long> resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ try (Table stateTable = stateTableSupplier.get()) {
+ byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
+ Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
+ scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+
+ try (ResultScanner scanner = stateTable.getScanner(scan)) {
+ Result next;
+ while ((next = scanner.next()) != null) {
+ byte[] region = getRegionFromKey(next.getRow());
+ if (regions.contains(region)) {
+ byte[] timeBytes = next.getValue(FAMILY, PRUNE_UPPER_BOUND_COL);
+ if (timeBytes != null) {
+ long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
+ resultMap.put(region, pruneUpperBoundRegion);
+ }
+ }
+ }
+ }
+ return resultMap;
+ }
+ }
+
+ /**
+ * Delete prune upper bounds for the regions that are not in the given exclude set, and the
+ * prune upper bound is less than the given value.
+ * After the invalid list is pruned up to deletionPruneUpperBound, we do not need entries for regions that have
+ * prune upper bound less than deletionPruneUpperBound. We however limit the deletion to only regions that are
+ * no longer in existence (due to deletion, etc.), to avoid update/delete race conditions.
+ *
+ * @param deletionPruneUpperBound prune upper bound below which regions will be deleted
+ * @param excludeRegions set of regions that should not be deleted
+ * @throws IOException when not able to delete data in HBase
+ */
+ public void deletePruneUpperBounds(long deletionPruneUpperBound, SortedSet<byte[]> excludeRegions)
+ throws IOException {
+ try (Table stateTable = stateTableSupplier.get()) {
+ byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
+ Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
+ scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+
+ try (ResultScanner scanner = stateTable.getScanner(scan)) {
+ Result next;
+ while ((next = scanner.next()) != null) {
+ byte[] region = getRegionFromKey(next.getRow());
+ if (!excludeRegions.contains(region)) {
+ byte[] timeBytes = next.getValue(FAMILY, PRUNE_UPPER_BOUND_COL);
+ if (timeBytes != null) {
+ long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
+ if (pruneUpperBoundRegion < deletionPruneUpperBound) {
+ stateTable.delete(new Delete(next.getRow()));
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ // ---------------------------------------------------
+ // ------- Methods for regions at a given time -------
+ // ---------------------------------------------------
+ // Key: 0x2<time><region-id>
+ // Col 't': <empty byte array>
+ // ---------------------------------------------------
+
+ /**
+ * Persist the regions for the given time. {@link HBaseTransactionPruningPlugin} saves the set of
+ * transactional regions existing in the HBase instance periodically.
+ *
+ * @param time timestamp in milliseconds
+ * @param regions set of regions at the time
+ * @throws IOException when not able to persist the data to HBase
+ */
+ public void saveRegionsForTime(long time, Set<byte[]> regions) throws IOException {
+ byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
+ try (Table stateTable = stateTableSupplier.get()) {
+ for (byte[] region : regions) {
+ Put put = new Put(makeTimeRegionKey(timeBytes, region));
+ put.addColumn(FAMILY, REGION_TIME_COL, EMPTY_BYTE_ARRAY);
+ stateTable.put(put);
+ }
+ }
+ }
+
+ /**
+ * Return the set of regions saved for the time at or before the given time. This method finds the greatest time
+ * that is less than or equal to the given time, and then returns all regions with that exact time, but none that are
+ * older than that.
+ *
+ * @param time timestamp in milliseconds
+ * @return set of regions and time at which they were recorded, or null if no regions found
+ * @throws IOException when not able to read the data from HBase
+ */
+ @Nullable
+ public TimeRegions getRegionsOnOrBeforeTime(long time) throws IOException {
+ byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
+ try (Table stateTable = stateTableSupplier.get()) {
+ Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
+ scan.addColumn(FAMILY, REGION_TIME_COL);
+
+ SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+ long currentRegionTime = -1;
+ try (ResultScanner scanner = stateTable.getScanner(scan)) {
+ Result next;
+ while ((next = scanner.next()) != null) {
+ Map.Entry<Long, byte[]> timeRegion = getTimeRegion(next.getRow());
+ // Stop if reached next time value
+ if (currentRegionTime == -1) {
+ currentRegionTime = timeRegion.getKey();
+ } else if (timeRegion.getKey() < currentRegionTime) {
+ break;
+ } else if (timeRegion.getKey() > currentRegionTime) {
+ throw new IllegalStateException(
+ String.format("Got out of order time %d when expecting time less than or equal to %d",
+ timeRegion.getKey(), currentRegionTime));
+ }
+ regions.add(timeRegion.getValue());
+ }
+ }
+ return regions.isEmpty() ? null : new TimeRegions(currentRegionTime, regions);
+ }
+ }
+
+ /**
+ * Delete all the regions that were recorded for all times equal or less than the given time.
+ *
+ * @param time timestamp in milliseconds
+ * @throws IOException when not able to delete data in HBase
+ */
+ public void deleteAllRegionsOnOrBeforeTime(long time) throws IOException {
+ byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
+ try (Table stateTable = stateTableSupplier.get()) {
+ Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
+ scan.addColumn(FAMILY, REGION_TIME_COL);
+
+ try (ResultScanner scanner = stateTable.getScanner(scan)) {
+ Result next;
+ while ((next = scanner.next()) != null) {
+ stateTable.delete(new Delete(next.getRow()));
+ }
+ }
+ }
+ }
+
+ // ---------------------------------------------------------------------
+ // ------- Methods for inactive transaction bound for given time -------
+ // ---------------------------------------------------------------------
+ // Key: 0x3<inverted time>
+ // Col 'p': <inactive transaction bound>
+ // ---------------------------------------------------------------------
+
+ /**
+ * Persist inactive transaction bound for a given time. This is the smallest not in-progress transaction that
+ * will not have writes in any HBase regions that are created after the given time.
+ *
+ * @param time time in milliseconds
+ * @param inactiveTransactionBound inactive transaction bound for the given time
+ * @throws IOException when not able to persist the data to HBase
+ */
+ public void saveInactiveTransactionBoundForTime(long time, long inactiveTransactionBound) throws IOException {
+ try (Table stateTable = stateTableSupplier.get()) {
+ Put put = new Put(makeInactiveTransactionBoundTimeKey(Bytes.toBytes(getInvertedTime(time))));
+ put.addColumn(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL, Bytes.toBytes(inactiveTransactionBound));
+ stateTable.put(put);
+ }
+ }
+
+ /**
+ * Return inactive transaction bound for the given time.
+ *
+ * @param time time in milliseconds
+ * @return inactive transaction bound for the given time
+ * @throws IOException when not able to read the data from HBase
+ */
+ public long getInactiveTransactionBoundForTime(long time) throws IOException {
+ try (Table stateTable = stateTableSupplier.get()) {
+ Get get = new Get(makeInactiveTransactionBoundTimeKey(Bytes.toBytes(getInvertedTime(time))));
+ get.addColumn(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL);
+ byte[] result = stateTable.get(get).getValue(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL);
+ return result == null ? -1 : Bytes.toLong(result);
+ }
+ }
+
+ /**
+ * Delete all inactive transaction bounds recorded for a time less than the given time
+ *
+ * @param time time in milliseconds
+ * @throws IOException when not able to delete data in HBase
+ */
+ public void deleteInactiveTransactionBoundsOnOrBeforeTime(long time) throws IOException {
+ try (Table stateTable = stateTableSupplier.get()) {
+ Scan scan = new Scan(makeInactiveTransactionBoundTimeKey(Bytes.toBytes(getInvertedTime(time))),
+ INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX_STOP);
+ scan.addColumn(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL);
+
+ try (ResultScanner scanner = stateTable.getScanner(scan)) {
+ Result next;
+ while ((next = scanner.next()) != null) {
+ stateTable.delete(new Delete(next.getRow()));
+ }
+ }
+ }
+ }
+
+ private byte[] makeRegionKey(byte[] regionId) {
+ return Bytes.add(REGION_KEY_PREFIX, regionId);
+ }
+
+ private byte[] getRegionFromKey(byte[] regionKey) {
+ int prefixLen = REGION_KEY_PREFIX.length;
+ return Bytes.copy(regionKey, prefixLen, regionKey.length - prefixLen);
+ }
+
+ private byte[] makeTimeRegionKey(byte[] time, byte[] regionId) {
+ return Bytes.add(REGION_TIME_KEY_PREFIX, time, regionId);
+ }
+
+ private byte[] makeInactiveTransactionBoundTimeKey(byte[] time) {
+ return Bytes.add(INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX, time);
+ }
+
+ private Map.Entry<Long, byte[]> getTimeRegion(byte[] key) {
+ int offset = REGION_TIME_KEY_PREFIX.length;
+ long time = getInvertedTime(Bytes.toLong(key, offset));
+ offset += Bytes.SIZEOF_LONG;
+ byte[] regionName = Bytes.copy(key, offset, key.length - offset);
+ return Maps.immutableEntry(time, regionName);
+ }
+
+ private long getInvertedTime(long time) {
+ return Long.MAX_VALUE - time;
+ }
+
+ /**
+ * Supplies table for persisting state
+ */
+ public interface TableSupplier {
+ Table get() throws IOException;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/abf34e5f/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
new file mode 100644
index 0000000..2e3b8d0
--- /dev/null
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.tephra.txprune.TransactionPruningPlugin;
+import org.apache.tephra.util.TxUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * Default implementation of the {@link TransactionPruningPlugin} for HBase.
+ *
+ * This plugin determines the prune upper bound for transactional HBase tables that use
+ * coprocessor {@link TransactionProcessor}.
+ *
+ * <h3>State storage:</h3>
+ *
+ * This plugin expects the TransactionProcessor to save the prune upper bound for invalid transactions
+ * after every major compaction of a region. Let's call this <i>(region, prune upper bound)</i>.
+ * In addition, the plugin also persists the following information on a run at time <i>t</i>
+ * <ul>
+ * <li>
+ * <i>(t, set of regions)</i>: Set of transactional regions at time <i>t</i>.
+ * Transactional regions are regions of the tables that have the coprocessor TransactionProcessor
+ * attached to them.
+ * </li>
+ * <li>
+ * <i>(t, inactive transaction bound)</i>: This is the smallest not in-progress transaction that
+ * will not have writes in any HBase regions that are created after time <i>t</i>.
+ * This value is determined by the Transaction Service based on the transaction state at time <i>t</i>
+ * and passed on to the plugin.
+ * </li>
+ * </ul>
+ *
+ * <h3>Computing prune upper bound:</h3>
+ *
+ * In a typical HBase instance, there can be a constant change in the number of regions due to region creations,
+ * splits and merges. At any given time there can always be a region on which a major compaction has not been run.
+ * Since the prune upper bound will get recorded for a region only after a major compaction,
+ * using only the latest set of regions we may not be able to find the
+ * prune upper bounds for all the current regions. Hence we persist the set of regions that exist at that time
+ * of each run of the plugin, and use historical region set for time <i>t</i>, <i>t - 1</i>, etc.
+ * to determine the prune upper bound.
+ *
+ * From the regions saved at time <i>t</i>, <i>t - 1</i>, etc.,
+ * the plugin tries to find the latest <i>(t, set of regions)</i> where all regions have been major compacted,
+ * i.e, all regions have prune upper bound recorded in <i>(region, prune upper bound)</i>.
+ * <br/>
+ * If such a set is found for time <i>t1</i>, the prune upper bound returned by the plugin is the minimum of
+ * <ul>
+ * <li>Prune upper bounds of regions in set <i>(t1, set of regions)</i></li>
+ * <li>Inactive transaction bound from <i>(t1, inactive transaction bound)</i></li>
+ * </ul>
+ *
+ * <p/>
+ * Above, when we find <i>(t1, set of regions)</i>, there may a region that was created after time <i>t1</i>,
+ * but has a data write from an invalid transaction that is smaller than the prune upper bounds of all
+ * regions in <i>(t1, set of regions)</i>. This is possible because <i>(region, prune upper bound)</i> persisted by
+ * TransactionProcessor is always the latest prune upper bound for a region.
+ * <br/>
+ * However a region created after time <i>t1</i> cannot have writes from an invalid transaction that is smaller than
+ * inactive transaction bound at the time the region was created.
+ * Since we limit the plugin prune upper bound using <i>(t1, inactive transaction bound)</i>,
+ * there should be no invalid transactions smaller than the plugin prune upper bound with writes in any
+ * transactional region of this HBase instance.
+ *
+ * <p/>
+ * Note: If your tables uses a transactional coprocessor other than TransactionProcessor,
+ * then you may need to write a new plugin to compute prune upper bound for those tables.
+ */
+@SuppressWarnings("WeakerAccess")
+public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
+ public static final Logger LOG = LoggerFactory.getLogger(HBaseTransactionPruningPlugin.class);
+
+ protected Configuration conf;
+ protected Connection connection;
+ protected DataJanitorState dataJanitorState;
+
+ @Override
+ public void initialize(Configuration conf) throws IOException {
+ this.conf = conf;
+ this.connection = ConnectionFactory.createConnection(conf);
+
+ final TableName stateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
+ LOG.info("Initializing plugin with state table {}:{}", stateTable.getNamespaceAsString(),
+ stateTable.getNameAsString());
+ this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
+ @Override
+ public Table get() throws IOException {
+ return connection.getTable(stateTable);
+ }
+ });
+ }
+
+ /**
+ * Determines prune upper bound for the data store as mentioned above.
+ */
+ @Override
+ public long fetchPruneUpperBound(long time, long inactiveTransactionBound) throws IOException {
+ LOG.debug("Fetching prune upper bound for time {} and inactive transaction bound {}",
+ time, inactiveTransactionBound);
+ if (time < 0 || inactiveTransactionBound < 0) {
+ return -1;
+ }
+
+ // Get all the current transactional regions
+ SortedSet<byte[]> transactionalRegions = getTransactionalRegions();
+ if (!transactionalRegions.isEmpty()) {
+ LOG.debug("Saving {} transactional regions for time {}", transactionalRegions.size(), time);
+ dataJanitorState.saveRegionsForTime(time, transactionalRegions);
+ // Save inactive transaction bound for time as the final step.
+ // We can then use its existence to make sure that the data for a given time is complete or not
+ LOG.debug("Saving inactive transaction bound {} for time {}", inactiveTransactionBound, time);
+ dataJanitorState.saveInactiveTransactionBoundForTime(time, inactiveTransactionBound);
+ }
+
+ return computePruneUpperBound(new TimeRegions(time, transactionalRegions));
+ }
+
+ /**
+ * After invalid list has been pruned, this cleans up state information that is no longer required.
+ * This includes -
+ * <ul>
+ * <li>
+ * <i>(region, prune upper bound)</i> - prune upper bound for regions that are older
+ * than maxPrunedInvalid
+ * </li>
+ * <li>
+ * <i>(t, set of regions) - Regions set that were recorded on or before the start time
+ * of maxPrunedInvalid
+ * </li>
+ * <li>
+ * (t, inactive transaction bound) - Smallest not in-progress transaction without any writes in new regions
+ * information recorded on or before the start time of maxPrunedInvalid
+ * </li>
+ * </ul>
+ */
+ @Override
+ public void pruneComplete(long time, long maxPrunedInvalid) throws IOException {
+ LOG.debug("Prune complete for time {} and prune upper bound {}", time, maxPrunedInvalid);
+ if (time < 0 || maxPrunedInvalid < 0) {
+ return;
+ }
+
+ // Get regions for the current time, so as to not delete the prune upper bounds for them.
+ // The prune upper bounds for regions are recorded by TransactionProcessor and the deletion
+ // is done by this class. To avoid update/delete race condition, we only delete prune upper
+ // bounds for the stale regions.
+ TimeRegions regionsToExclude = dataJanitorState.getRegionsOnOrBeforeTime(time);
+ if (regionsToExclude != null) {
+ LOG.debug("Deleting prune upper bounds smaller than {} for stale regions", maxPrunedInvalid);
+ dataJanitorState.deletePruneUpperBounds(maxPrunedInvalid, regionsToExclude.getRegions());
+ } else {
+ LOG.warn("Cannot find saved regions on or before time {}", time);
+ }
+ long pruneTime = TxUtils.getTimestamp(maxPrunedInvalid);
+ LOG.debug("Deleting regions recorded before time {}", pruneTime);
+ dataJanitorState.deleteAllRegionsOnOrBeforeTime(pruneTime);
+ LOG.debug("Deleting inactive transaction bounds recorded on or before time {}", pruneTime);
+ dataJanitorState.deleteInactiveTransactionBoundsOnOrBeforeTime(pruneTime);
+ }
+
+ @Override
+ public void destroy() {
+ LOG.info("Stopping plugin...");
+ try {
+ connection.close();
+ } catch (IOException e) {
+ LOG.error("Got exception while closing HBase connection", e);
+ }
+ }
+
+ protected boolean isTransactionalTable(HTableDescriptor tableDescriptor) {
+ return tableDescriptor.hasCoprocessor(TransactionProcessor.class.getName());
+ }
+
+ protected SortedSet<byte[]> getTransactionalRegions() throws IOException {
+ SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+ try (Admin admin = connection.getAdmin()) {
+ HTableDescriptor[] tableDescriptors = admin.listTables();
+ LOG.debug("Got {} tables to process", tableDescriptors == null ? 0 : tableDescriptors.length);
+ if (tableDescriptors != null) {
+ for (HTableDescriptor tableDescriptor : tableDescriptors) {
+ if (isTransactionalTable(tableDescriptor)) {
+ List<HRegionInfo> tableRegions = admin.getTableRegions(tableDescriptor.getTableName());
+ LOG.debug("Regions for table {}: {}", tableDescriptor.getTableName(), tableRegions);
+ if (tableRegions != null) {
+ for (HRegionInfo region : tableRegions) {
+ regions.add(region.getRegionName());
+ }
+ }
+ } else {
+ LOG.debug("{} is not a transactional table", tableDescriptor.getTableName());
+ }
+ }
+ }
+ }
+ return regions;
+ }
+
+ /**
+ * Try to find the latest set of regions in which all regions have been major compacted, and
+ * compute prune upper bound from them. Starting from newest to oldest, this looks into the
+ * region set that has been saved periodically, and joins it with the prune upper bound data
+ * for a region recorded after a major compaction.
+ *
+ * @param timeRegions the latest set of regions
+ * @return prune upper bound
+ * @throws IOException when not able to talk to HBase
+ */
+ private long computePruneUpperBound(TimeRegions timeRegions) throws IOException {
+ do {
+ LOG.debug("Computing prune upper bound for {}", timeRegions);
+ SortedSet<byte[]> transactionalRegions = timeRegions.getRegions();
+ long time = timeRegions.getTime();
+
+ Map<byte[], Long> pruneUpperBoundRegions = dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions);
+ logPruneUpperBoundRegions(pruneUpperBoundRegions);
+ // If prune upper bounds are found for all the transactional regions, then compute the prune upper bound
+ // across all regions
+ if (!transactionalRegions.isEmpty() && pruneUpperBoundRegions.size() == transactionalRegions.size()) {
+ long inactiveTransactionBound = dataJanitorState.getInactiveTransactionBoundForTime(time);
+ LOG.debug("Found max prune upper bound {} for time {}", inactiveTransactionBound, time);
+ // If inactiveTransactionBound is not recorded then that means the data is not complete for these regions
+ if (inactiveTransactionBound != -1) {
+ Long minPruneUpperBoundRegions = Collections.min(pruneUpperBoundRegions.values());
+ return Math.min(inactiveTransactionBound, minPruneUpperBoundRegions);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ignoring regions for time {} as no inactiveTransactionBound was found for that time, " +
+ "and hence the data must be incomplete", time);
+ }
+ }
+ } else {
+ if (LOG.isDebugEnabled()) {
+ Sets.SetView<byte[]> difference = Sets.difference(transactionalRegions, pruneUpperBoundRegions.keySet());
+ LOG.debug("Ignoring regions for time {} because the following regions did not record a pruneUpperBound: {}",
+ time, Iterables.transform(difference, TimeRegions.BYTE_ARR_TO_STRING_FN));
+ }
+ }
+
+ timeRegions = dataJanitorState.getRegionsOnOrBeforeTime(time - 1);
+ } while (timeRegions != null);
+ return -1;
+ }
+
+ private void logPruneUpperBoundRegions(Map<byte[], Long> pruneUpperBoundRegions) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got region - prune upper bound map: {}",
+ Iterables.transform(pruneUpperBoundRegions.entrySet(),
+ new Function<Map.Entry<byte[], Long>, Map.Entry<String, Long>>() {
+ @Override
+ public Map.Entry<String, Long> apply(Map.Entry<byte[], Long> input) {
+ String regionName = TimeRegions.BYTE_ARR_TO_STRING_FN.apply(input.getKey());
+ return Maps.immutableEntry(regionName, input.getValue());
+ }
+ }));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/abf34e5f/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/TimeRegions.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/TimeRegions.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/TimeRegions.java
new file mode 100644
index 0000000..4ac8887
--- /dev/null
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/TimeRegions.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.util.Objects;
+import java.util.SortedSet;
+
+/**
+ * Contains information on the set of transactional regions recorded at a given time
+ */
+@SuppressWarnings("WeakerAccess")
+public class TimeRegions {
+ static final Function<byte[], String> BYTE_ARR_TO_STRING_FN =
+ new Function<byte[], String>() {
+ @Override
+ public String apply(byte[] input) {
+ return Bytes.toStringBinary(input);
+ }
+ };
+
+ private final long time;
+ private final SortedSet<byte[]> regions;
+
+ public TimeRegions(long time, SortedSet<byte[]> regions) {
+ this.time = time;
+ this.regions = regions;
+ }
+
+ public long getTime() {
+ return time;
+ }
+
+ public SortedSet<byte[]> getRegions() {
+ return regions;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TimeRegions that = (TimeRegions) o;
+ return time == that.time &&
+ Objects.equals(regions, that.regions);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(time, regions);
+ }
+
+ @Override
+ public String toString() {
+ Iterable<String> regionStrings = Iterables.transform(regions, BYTE_ARR_TO_STRING_FN);
+ return "TimeRegions{" +
+ "time=" + time +
+ ", regions=[" + Joiner.on(" ").join(regionStrings) + "]" +
+ '}';
+ }
+}