You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/11/09 01:32:25 UTC
[1/3] incubator-tephra git commit: Reduce test stdout output due to
Travis log output limit of 4 MB
Repository: incubator-tephra
Updated Branches:
refs/heads/master 99c7bec49 -> f5f682af6
Reduce test stdout output due to Travis log output limit of 4 MB
Signed-off-by: poorna <po...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/f5f682af
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/f5f682af
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/f5f682af
Branch: refs/heads/master
Commit: f5f682af68e7a7884788de5f7bba41d25d80280f
Parents: 2ae7032
Author: poorna <po...@cask.co>
Authored: Mon Nov 7 15:28:48 2016 -0800
Committer: poorna <po...@apache.org>
Committed: Tue Nov 8 17:30:47 2016 -0800
----------------------------------------------------------------------
.travis.yml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/f5f682af/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 898142d..fb3a64c 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -29,7 +29,7 @@ branches:
- /^hotfix\/.*$/
- /^release\/.*$/
-script: mvn test -Dsurefire.redirectTestOutputToFile=false
+script: mvn test
sudo: false
[3/3] incubator-tephra git commit: Refactor existing test
Posted by po...@apache.org.
Refactor existing test
Signed-off-by: poorna <po...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/e56f6352
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/e56f6352
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/e56f6352
Branch: refs/heads/master
Commit: e56f635231c8281ee656cb53576433491e362742
Parents: 99c7bec
Author: poorna <po...@cask.co>
Authored: Fri Oct 28 17:46:01 2016 -0700
Committer: poorna <po...@apache.org>
Committed: Tue Nov 8 17:30:47 2016 -0800
----------------------------------------------------------------------
tephra-hbase-compat-1.1-base/pom.xml | 5 +
.../tephra/hbase/AbstractHBaseTableTest.java | 107 +++++++++++++++++++
.../hbase/TransactionAwareHTableTest.java | 75 ++-----------
3 files changed, 123 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e56f6352/tephra-hbase-compat-1.1-base/pom.xml
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/pom.xml b/tephra-hbase-compat-1.1-base/pom.xml
index b0eee6d..b6a58e0 100644
--- a/tephra-hbase-compat-1.1-base/pom.xml
+++ b/tephra-hbase-compat-1.1-base/pom.xml
@@ -28,6 +28,11 @@
<artifactId>tephra-hbase-compat-1.1-base</artifactId>
<name>Apache Tephra HBase 1.1 Compatibility Base</name>
+ <properties>
+ <hadoop.version>2.5.1</hadoop.version>
+ <hbase11.version>1.1.1</hbase11.version>
+ </properties>
+
<packaging>pom</packaging>
<modules>
<module>tephra-hbase-compat-1.2-cdh</module>
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e56f6352/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java
new file mode 100644
index 0000000..68c43ae
--- /dev/null
+++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ *
+ */
+@SuppressWarnings("WeakerAccess")
+public abstract class AbstractHBaseTableTest {
+ static HBaseTestingUtility testUtil;
+ static HBaseAdmin hBaseAdmin;
+ static Configuration conf;
+
+ @BeforeClass
+ public static void startMiniCluster() throws Exception {
+ testUtil = new HBaseTestingUtility();
+ conf = testUtil.getConfiguration();
+
+ // Tune down the connection thread pool size
+ conf.setInt("hbase.hconnection.threads.core", 5);
+ conf.setInt("hbase.hconnection.threads.max", 10);
+ // Tunn down handler threads in regionserver
+ conf.setInt("hbase.regionserver.handler.count", 10);
+
+ // Set to random port
+ conf.setInt("hbase.master.port", 0);
+ conf.setInt("hbase.master.info.port", 0);
+ conf.setInt("hbase.regionserver.port", 0);
+ conf.setInt("hbase.regionserver.info.port", 0);
+
+ testUtil.startMiniCluster();
+ hBaseAdmin = testUtil.getHBaseAdmin();
+ }
+
+ @AfterClass
+ public static void shutdownMiniCluster() throws Exception {
+ try {
+ if (hBaseAdmin != null) {
+ hBaseAdmin.close();
+ }
+ } finally {
+ testUtil.shutdownMiniCluster();
+ }
+ }
+
+ static HTable createTable(byte[] tableName, byte[][] columnFamilies) throws Exception {
+ return createTable(tableName, columnFamilies, false,
+ Collections.singletonList(TransactionProcessor.class.getName()));
+ }
+
+ static HTable createTable(byte[] tableName, byte[][] columnFamilies, boolean existingData,
+ List<String> coprocessors) throws Exception {
+ HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
+ for (byte[] family : columnFamilies) {
+ HColumnDescriptor columnDesc = new HColumnDescriptor(family);
+ columnDesc.setMaxVersions(Integer.MAX_VALUE);
+ columnDesc.setValue(TxConstants.PROPERTY_TTL, String.valueOf(100000)); // in millis
+ desc.addFamily(columnDesc);
+ }
+ if (existingData) {
+ desc.setValue(TxConstants.READ_NON_TX_DATA, "true");
+ }
+ // Divide individually to prevent any overflow
+ int priority = Coprocessor.PRIORITY_USER;
+ // order in list is the same order that coprocessors will be invoked
+ for (String coprocessor : coprocessors) {
+ desc.addCoprocessor(coprocessor, null, ++priority, null);
+ }
+ hBaseAdmin.createTable(desc);
+ testUtil.waitTableAvailable(tableName, 5000);
+ return new HTable(testUtil.getConfiguration(), tableName);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e56f6352/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
index de1fa6b..c336712 100644
--- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
+++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
@@ -19,21 +19,14 @@ package org.apache.tephra.hbase;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.OperationWithAttributes;
@@ -89,14 +82,11 @@ import static org.junit.Assert.fail;
/**
* Tests for TransactionAwareHTables.
*/
-public class TransactionAwareHTableTest {
+public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
private static final Logger LOG = LoggerFactory.getLogger(TransactionAwareHTableTest.class);
- private static HBaseTestingUtility testUtil;
- private static HBaseAdmin hBaseAdmin;
- private static TransactionStateStorage txStateStorage;
- private static TransactionManager txManager;
- private static Configuration conf;
+ static TransactionStateStorage txStateStorage;
+ static TransactionManager txManager;
private TransactionContext transactionContext;
private TransactionAwareHTable transactionAwareHTable;
private HTable hTable;
@@ -146,23 +136,6 @@ public class TransactionAwareHTableTest {
@BeforeClass
public static void setupBeforeClass() throws Exception {
- testUtil = new HBaseTestingUtility();
- conf = testUtil.getConfiguration();
-
- // Tune down the connection thread pool size
- conf.setInt("hbase.hconnection.threads.core", 5);
- conf.setInt("hbase.hconnection.threads.max", 10);
- // Tunn down handler threads in regionserver
- conf.setInt("hbase.regionserver.handler.count", 10);
-
- // Set to random port
- conf.setInt("hbase.master.port", 0);
- conf.setInt("hbase.master.info.port", 0);
- conf.setInt("hbase.regionserver.port", 0);
- conf.setInt("hbase.regionserver.info.port", 0);
-
- testUtil.startMiniCluster();
- hBaseAdmin = testUtil.getHBaseAdmin();
txStateStorage = new InMemoryTransactionStateStorage();
txManager = new TransactionManager(conf, txStateStorage, new TxMetricsCollector());
txManager.startAndWait();
@@ -170,8 +143,9 @@ public class TransactionAwareHTableTest {
@AfterClass
public static void shutdownAfterClass() throws Exception {
- testUtil.shutdownMiniCluster();
- hBaseAdmin.close();
+ if (txManager != null) {
+ txManager.stopAndWait();
+ }
}
@Before
@@ -187,34 +161,6 @@ public class TransactionAwareHTableTest {
hBaseAdmin.deleteTable(TestBytes.table);
}
- private HTable createTable(byte[] tableName, byte[][] columnFamilies) throws Exception {
- return createTable(tableName, columnFamilies, false, Collections.<String>emptyList());
- }
-
- private HTable createTable(byte[] tableName, byte[][] columnFamilies, boolean existingData,
- List<String> coprocessors) throws Exception {
- HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
- for (byte[] family : columnFamilies) {
- HColumnDescriptor columnDesc = new HColumnDescriptor(family);
- columnDesc.setMaxVersions(Integer.MAX_VALUE);
- columnDesc.setValue(TxConstants.PROPERTY_TTL, String.valueOf(100000)); // in millis
- desc.addFamily(columnDesc);
- }
- if (existingData) {
- desc.setValue(TxConstants.READ_NON_TX_DATA, "true");
- }
- // Divide individually to prevent any overflow
- int priority = Coprocessor.PRIORITY_USER;
- desc.addCoprocessor(TransactionProcessor.class.getName(), null, priority, null);
- // order in list is the same order that coprocessors will be invoked
- for (String coprocessor : coprocessors) {
- desc.addCoprocessor(coprocessor, null, ++priority, null);
- }
- hBaseAdmin.createTable(desc);
- testUtil.waitTableAvailable(tableName, 5000);
- return new HTable(testUtil.getConfiguration(), tableName);
- }
-
/**
* Test transactional put and get requests.
*
@@ -406,7 +352,7 @@ public class TransactionAwareHTableTest {
public void testAttributesPreserved() throws Exception {
HTable hTable = createTable(Bytes.toBytes("TestAttributesPreserved"),
new byte[][]{TestBytes.family, TestBytes.family2}, false,
- Lists.newArrayList(TestRegionObserver.class.getName()));
+ Lists.newArrayList(TransactionProcessor.class.getName(), TestRegionObserver.class.getName()));
try {
TransactionAwareHTable txTable = new TransactionAwareHTable(hTable);
TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
@@ -1117,7 +1063,7 @@ public class TransactionAwareHTableTest {
TransactionAwareHTable txTable =
new TransactionAwareHTable(createTable(Bytes.toBytes("testExistingData"), new byte[][]{TestBytes.family}, true,
- Collections.<String>emptyList()));
+ Collections.singletonList(TransactionProcessor.class.getName())));
TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
// Add some pre-existing, non-transactional data
@@ -1266,8 +1212,9 @@ public class TransactionAwareHTableTest {
@Test
public void testVisibilityAll() throws Exception {
- HTable nonTxTable = createTable(Bytes.toBytes("testVisibilityAll"),
- new byte[][]{TestBytes.family, TestBytes.family2}, true, Collections.<String>emptyList());
+ HTable nonTxTable =
+ createTable(Bytes.toBytes("testVisibilityAll"), new byte[][]{TestBytes.family, TestBytes.family2},
+ true, Collections.singletonList(TransactionProcessor.class.getName()));
TransactionAwareHTable txTable =
new TransactionAwareHTable(nonTxTable,
TxConstants.ConflictDetection.ROW); // ROW conflict detection to verify family deletes
[2/3] incubator-tephra git commit: TEPHRA-35 Save compaction state
for pruning invalid list
Posted by po...@apache.org.
TEPHRA-35 Save compaction state for pruning invalid list
This closes #19
Signed-off-by: poorna <po...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/2ae70328
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/2ae70328
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/2ae70328
Branch: refs/heads/master
Commit: 2ae70328e0a5581f8b37ea694d8bd955fcdc259d
Parents: e56f635
Author: poorna <po...@cask.co>
Authored: Fri Oct 28 15:12:23 2016 -0700
Committer: poorna <po...@apache.org>
Committed: Tue Nov 8 17:30:47 2016 -0800
----------------------------------------------------------------------
.../java/org/apache/tephra/TxConstants.java | 10 +
.../java/org/apache/tephra/util/TxUtils.java | 17 ++
.../org/apache/tephra/util/TxUtilsTest.java | 22 ++
.../hbase/coprocessor/TransactionProcessor.java | 33 ++-
.../coprocessor/janitor/CompactionState.java | 92 ++++++++
.../coprocessor/janitor/DataJanitorState.java | 71 +++++++
.../tephra/hbase/AbstractHBaseTableTest.java | 4 +-
.../tephra/hbase/InvalidListPruneTest.java | 210 +++++++++++++++++++
8 files changed, 456 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2ae70328/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
index b9a7929..25451b3 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
@@ -345,4 +345,14 @@ public class TxConstants {
public static final byte CURRENT_VERSION = 3;
}
+ /**
+ * Configuration for data janitor
+ */
+ public static final class DataJanitor {
+ public static final String PRUNE_ENABLE = "data.tx.prune.enable";
+ public static final String PRUNE_STATE_TABLE = "data.tx.prune.state.table";
+
+ public static final boolean DEFAULT_PRUNE_ENABLE = false;
+ public static final String DEFAULT_PRUNE_STATE_TABLE = "data_tx_janitor_state";
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2ae70328/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java b/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java
index 08b1545..b3d4ace 100644
--- a/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java
+++ b/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java
@@ -149,4 +149,21 @@ public class TxUtils {
public static boolean isPreExistingVersion(long version) {
return version < MAX_NON_TX_TIMESTAMP;
}
+
+ /**
+ * Returns the maximum transaction that can be removed from the invalid list for the state represented by the given
+ * transaction.
+ */
+ public static long getPruneUpperBound(Transaction tx) {
+ // If there are no invalid transactions, and no in-progress transactions then we can prune the invalid list
+ // up to the current read pointer
+ if (tx.getInvalids().length == 0 && tx.getInProgress().length == 0) {
+ return tx.getReadPointer() - 1;
+ }
+
+ long maxInvalidTx =
+ tx.getInvalids().length > 0 ? tx.getInvalids()[tx.getInvalids().length - 1] : Transaction.NO_TX_IN_PROGRESS;
+ long firstInProgress = tx.getFirstInProgress();
+ return Math.min(maxInvalidTx, firstInProgress - 1);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2ae70328/tephra-core/src/test/java/org/apache/tephra/util/TxUtilsTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/util/TxUtilsTest.java b/tephra-core/src/test/java/org/apache/tephra/util/TxUtilsTest.java
index 7743105..db687fe 100644
--- a/tephra-core/src/test/java/org/apache/tephra/util/TxUtilsTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/util/TxUtilsTest.java
@@ -19,6 +19,7 @@
package org.apache.tephra.util;
import org.apache.tephra.Transaction;
+import org.junit.Assert;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -32,4 +33,25 @@ public class TxUtilsTest {
// make sure we don't overflow with MAX_VALUE write pointer
assertEquals(Long.MAX_VALUE, TxUtils.getMaxVisibleTimestamp(Transaction.ALL_VISIBLE_LATEST));
}
+
+ @Test
+ public void testPruneUpperBound() {
+ Transaction tx = new Transaction(100, 100, new long[] {10, 30}, new long[] {80, 90}, 80);
+ Assert.assertEquals(30, TxUtils.getPruneUpperBound(tx));
+
+ tx = new Transaction(100, 100, new long[] {10, 95}, new long[] {80, 90}, 80);
+ Assert.assertEquals(79, TxUtils.getPruneUpperBound(tx));
+
+ tx = new Transaction(100, 110, new long[] {10}, new long[] {}, Transaction.NO_TX_IN_PROGRESS);
+ Assert.assertEquals(10, TxUtils.getPruneUpperBound(tx));
+
+ tx = new Transaction(100, 110, new long[] {}, new long[] {60}, 60);
+ Assert.assertEquals(59, TxUtils.getPruneUpperBound(tx));
+
+ tx = new Transaction(100, 110, new long[] {}, new long[] {50}, 50);
+ Assert.assertEquals(49, TxUtils.getPruneUpperBound(tx));
+
+ tx = new Transaction(100, 110, new long[] {}, new long[] {}, Transaction.NO_TX_IN_PROGRESS);
+ Assert.assertEquals(99, TxUtils.getPruneUpperBound(tx));
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2ae70328/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 14941b3..9f723d6 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
@@ -45,6 +46,7 @@ import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -54,6 +56,7 @@ import org.apache.tephra.TransactionCodec;
import org.apache.tephra.TxConstants;
import org.apache.tephra.coprocessor.TransactionStateCache;
import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
+import org.apache.tephra.hbase.coprocessor.janitor.CompactionState;
import org.apache.tephra.persist.TransactionVisibilityState;
import org.apache.tephra.util.TxUtils;
@@ -99,6 +102,7 @@ public class TransactionProcessor extends BaseRegionObserver {
private TransactionStateCache cache;
private final TransactionCodec txCodec;
+ private CompactionState compactionState;
protected Map<byte[], Long> ttlByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
protected boolean allowEmptyValues = TxConstants.ALLOW_EMPTY_VALUES_DEFAULT;
protected boolean readNonTxnData = TxConstants.DEFAULT_READ_NON_TX_DATA;
@@ -138,6 +142,16 @@ public class TransactionProcessor extends BaseRegionObserver {
if (readNonTxnData) {
LOG.info("Reading pre-existing data enabled for table " + tableDesc.getNameAsString());
}
+
+ boolean pruneEnabled = env.getConfiguration().getBoolean(TxConstants.DataJanitor.PRUNE_ENABLE,
+ TxConstants.DataJanitor.DEFAULT_PRUNE_ENABLE);
+ if (pruneEnabled) {
+ String pruneTable = env.getConfiguration().get(TxConstants.DataJanitor.PRUNE_STATE_TABLE,
+ TxConstants.DataJanitor.DEFAULT_PRUNE_STATE_TABLE);
+ compactionState = new CompactionState(env, TableName.valueOf(pruneTable));
+ LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " +
+ pruneTable);
+ }
}
}
@@ -268,10 +282,27 @@ public class TransactionProcessor extends BaseRegionObserver {
List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s,
CompactionRequest request)
throws IOException {
- return createStoreScanner(c.getEnvironment(), "compaction", cache.getLatestState(), store, scanners,
+ // Get the latest tx snapshot state for the compaction
+ TransactionVisibilityState snapshot = cache.getLatestState();
+
+ // Record tx state before the compaction
+ if (compactionState != null) {
+ compactionState.record(request, snapshot);
+ }
+ // Also make sure to use the same snapshot for the compaction
+ return createStoreScanner(c.getEnvironment(), "compaction", snapshot, store, scanners,
scanType, earliestPutTs);
}
+ @Override
+ public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, StoreFile resultFile,
+ CompactionRequest request) throws IOException {
+ // Persist the compaction state after a succesful compaction
+ if (compactionState != null) {
+ compactionState.persist();
+ }
+ }
+
protected InternalScanner createStoreScanner(RegionCoprocessorEnvironment env, String action,
TransactionVisibilityState snapshot, Store store,
List<? extends KeyValueScanner> scanners, ScanType type,
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2ae70328/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/CompactionState.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/CompactionState.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/CompactionState.java
new file mode 100644
index 0000000..d02456a
--- /dev/null
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/CompactionState.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.coprocessor.janitor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.persist.TransactionVisibilityState;
+import org.apache.tephra.util.TxUtils;
+
+import java.io.IOException;
+import javax.annotation.Nullable;
+
+/**
+ * Record compaction state for invalid list pruning
+ */
+public class CompactionState {
+ private static final Log LOG = LogFactory.getLog(CompactionState.class);
+
+ private final byte[] regionName;
+ private final String regionNameAsString;
+ private final TableName stateTable;
+ private final DataJanitorState dataJanitorState;
+ private volatile long pruneUpperBound = -1;
+
+ public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable) {
+ this.regionName = env.getRegionInfo().getRegionName();
+ this.regionNameAsString = env.getRegionInfo().getRegionNameAsString();
+ this.stateTable = stateTable;
+ this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
+ @Override
+ public Table get() throws IOException {
+ return env.getTable(stateTable);
+ }
+ });
+ }
+
+ /**
+ * Records the transaction state used for a compaction. This method is called when the compaction starts.
+ *
+ * @param request {@link CompactionRequest} for the compaction
+ * @param snapshot transaction state that will be used for the compaction
+ */
+ public void record(CompactionRequest request, @Nullable TransactionVisibilityState snapshot) {
+ if (request.isMajor() && snapshot != null) {
+ Transaction tx = TxUtils.createDummyTransaction(snapshot);
+ pruneUpperBound = TxUtils.getPruneUpperBound(tx);
+ LOG.debug(
+ String.format("Computed prune upper bound %s for compaction request %s using transaction state from time %s",
+ pruneUpperBound, request, snapshot.getTimestamp()));
+ } else {
+ pruneUpperBound = -1;
+ }
+ }
+
+ /**
+ * Persists the transaction state recorded by {@link #record(CompactionRequest, TransactionVisibilityState)}.
+ * This method is called after the compaction has successfully completed.
+ */
+ public void persist() {
+ if (pruneUpperBound != -1) {
+ try {
+ dataJanitorState.savePruneUpperBound(regionName, pruneUpperBound);
+ LOG.debug(String.format("Saved prune upper bound %s for region %s", pruneUpperBound, regionNameAsString));
+ } catch (IOException e) {
+ LOG.warn(String.format("Cannot record prune upper bound in table %s after compacting region %s",
+ stateTable, regionNameAsString), e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2ae70328/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java
new file mode 100644
index 0000000..9d4f279
--- /dev/null
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.coprocessor.janitor;
+
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+
+/**
+ * Persist data janitor state into an HBase table.
+ */
+public class DataJanitorState {
+ public static final byte[] FAMILY = {'f'};
+ private static final byte[] PRUNE_UPPER_BOUND_COL = {'u'};
+ private static final byte[] REGION_KEY_PREFIX = {0x1};
+
+ private final TableSupplier stateTableSupplier;
+
+
+ public DataJanitorState(TableSupplier stateTableSupplier) {
+ this.stateTableSupplier = stateTableSupplier;
+ }
+
+ public void savePruneUpperBound(byte[] regionId, long pruneUpperBound) throws IOException {
+ try (Table stateTable = stateTableSupplier.get()) {
+ Put put = new Put(makeRegionKey(regionId));
+ put.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL, Bytes.toBytes(pruneUpperBound));
+ stateTable.put(put);
+ }
+ }
+
+ public long getPruneUpperBound(byte[] regionId) throws IOException {
+ try (Table stateTable = stateTableSupplier.get()) {
+ Get get = new Get(makeRegionKey(regionId));
+ get.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+ byte[] result = stateTable.get(get).getValue(FAMILY, PRUNE_UPPER_BOUND_COL);
+ return result == null ? -1 : Bytes.toLong(result);
+ }
+ }
+
+ private byte[] makeRegionKey(byte[] regionId) {
+ return Bytes.add(REGION_KEY_PREFIX, regionId);
+ }
+
+ /**
+ * Supplies table for persisting state
+ */
+ public interface TableSupplier {
+ Table get() throws IOException;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2ae70328/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java
index 68c43ae..cb8d695 100644
--- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java
+++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java
@@ -36,7 +36,7 @@ import java.util.Collections;
import java.util.List;
/**
- *
+ * Base class for tests that need a HBase cluster
*/
@SuppressWarnings("WeakerAccess")
public abstract class AbstractHBaseTableTest {
@@ -46,7 +46,7 @@ public abstract class AbstractHBaseTableTest {
@BeforeClass
public static void startMiniCluster() throws Exception {
- testUtil = new HBaseTestingUtility();
+ testUtil = conf == null ? new HBaseTestingUtility() : new HBaseTestingUtility(conf);
conf = testUtil.getConfiguration();
// Tune down the connection thread pool size
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2ae70328/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/InvalidListPruneTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/InvalidListPruneTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/InvalidListPruneTest.java
new file mode 100644
index 0000000..ebf58eb
--- /dev/null
+++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/InvalidListPruneTest.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase;
+
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSortedMap;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TransactionContext;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TransactionType;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.coprocessor.TransactionStateCache;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.tephra.hbase.coprocessor.janitor.DataJanitorState;
+import org.apache.tephra.inmemory.InMemoryTxSystemClient;
+import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.persist.InMemoryTransactionStateStorage;
+import org.apache.tephra.persist.TransactionSnapshot;
+import org.apache.tephra.persist.TransactionStateStorage;
+import org.apache.tephra.persist.TransactionVisibilityState;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+
+/**
+ * Test invalid list pruning
+ */
+public class InvalidListPruneTest extends AbstractHBaseTableTest {
+ private static final byte[] family = Bytes.toBytes("f1");
+ private static final byte[] qualifier = Bytes.toBytes("col1");
+
+ private static TableName dataTable;
+ private static TableName pruneStateTable;
+
+ // Override AbstractHBaseTableTest.startMiniCluster to setup configuration
+ @BeforeClass
+ public static void startMiniCluster() throws Exception {
+ // Setup the configuration to start HBase cluster with the invalid list pruning enabled
+ conf = HBaseConfiguration.create();
+ conf.setBoolean(TxConstants.DataJanitor.PRUNE_ENABLE, true);
+ AbstractHBaseTableTest.startMiniCluster();
+
+ TransactionStateStorage txStateStorage = new InMemoryTransactionStateStorage();
+ TransactionManager txManager = new TransactionManager(conf, txStateStorage, new TxMetricsCollector());
+ txManager.startAndWait();
+
+ // Do some transactional data operations
+ dataTable = TableName.valueOf("invalidListPruneTestTable");
+ HTable hTable = createTable(dataTable.getName(), new byte[][]{family}, false,
+ Collections.singletonList(TestTransactionProcessor.class.getName()));
+ try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, TxConstants.ConflictDetection.ROW)) {
+ TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
+ txContext.start();
+ for (int i = 0; i < 10; ++i) {
+ txTable.put(new Put(Bytes.toBytes(i)).addColumn(family, qualifier, Bytes.toBytes(i)));
+ }
+ txContext.finish();
+ }
+
+ testUtil.flush(dataTable);
+ txManager.stopAndWait();
+
+ pruneStateTable = TableName.valueOf(conf.get(TxConstants.DataJanitor.PRUNE_STATE_TABLE,
+ TxConstants.DataJanitor.DEFAULT_PRUNE_STATE_TABLE));
+ }
+
+ @AfterClass
+ public static void shutdownAfterClass() throws Exception {
+ hBaseAdmin.disableTable(dataTable);
+ hBaseAdmin.deleteTable(dataTable);
+ }
+
+ @Before
+ public void beforeTest() throws Exception {
+ HTable table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY}, false,
+ // Prune state table is a non-transactional table, hence no transaction co-processor
+ Collections.<String>emptyList());
+ table.close();
+ }
+
+ @After
+ public void afterTest() throws Exception {
+ hBaseAdmin.disableTable(pruneStateTable);
+ hBaseAdmin.deleteTable(pruneStateTable);
+ }
+
+ @Test
+ public void testRecordCompactionState() throws Exception {
+ DataJanitorState dataJanitorState =
+ new DataJanitorState(new DataJanitorState.TableSupplier() {
+ @Override
+ public Table get() throws IOException {
+ return testUtil.getConnection().getTable(pruneStateTable);
+ }
+ });
+
+ // No prune upper bound initially
+ Assert.assertEquals(-1, dataJanitorState.getPruneUpperBound(getRegionName(dataTable, Bytes.toBytes(0))));
+
+ // Create a new transaction snapshot
+ InMemoryTransactionStateCache.setTransactionSnapshot(
+ new TransactionSnapshot(100, 100, 100, ImmutableSet.of(50L),
+ ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
+ // Run minor compaction
+ testUtil.compact(dataTable, false);
+ // No prune upper bound after minor compaction too
+ Assert.assertEquals(-1, dataJanitorState.getPruneUpperBound(getRegionName(dataTable, Bytes.toBytes(0))));
+
+ // Run major compaction, and verify prune upper bound
+ testUtil.compact(dataTable, true);
+ Assert.assertEquals(50, dataJanitorState.getPruneUpperBound(getRegionName(dataTable, Bytes.toBytes(0))));
+
+ // Run major compaction again with same snapshot, prune upper bound should not change
+ testUtil.compact(dataTable, true);
+ Assert.assertEquals(50, dataJanitorState.getPruneUpperBound(getRegionName(dataTable, Bytes.toBytes(0))));
+
+ // Create a new transaction snapshot
+ InMemoryTransactionStateCache.setTransactionSnapshot(
+ new TransactionSnapshot(110, 111, 112, ImmutableSet.of(150L),
+ ImmutableSortedMap.of(
+ 105L, new TransactionManager.InProgressTx(100, 30, TransactionType.SHORT)
+ )
+ ));
+ Assert.assertEquals(50, dataJanitorState.getPruneUpperBound(getRegionName(dataTable, Bytes.toBytes(0))));
+
+ // Run major compaction again, now prune upper bound should change
+ testUtil.compact(dataTable, true);
+ Assert.assertEquals(104, dataJanitorState.getPruneUpperBound(getRegionName(dataTable, Bytes.toBytes(0))));
+ }
+
+ private byte[] getRegionName(TableName dataTable, byte[] row) throws IOException {
+ HRegionLocation regionLocation =
+ testUtil.getConnection().getRegionLocator(dataTable).getRegionLocation(row);
+ return regionLocation.getRegionInfo().getRegionName();
+ }
+
+ /**
+ * A transaction co-processor that uses in-memory {@link TransactionSnapshot} for testing
+ */
+ @SuppressWarnings("WeakerAccess")
+ public static class TestTransactionProcessor extends TransactionProcessor {
+ @Override
+ protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
+ return new Supplier<TransactionStateCache>() {
+ @Override
+ public TransactionStateCache get() {
+ return new InMemoryTransactionStateCache();
+ }
+ };
+ }
+ }
+
+ /**
+ * Used to supply in-memory {@link TransactionSnapshot} to {@link TestTransactionProcessor} for testing
+ */
+ @SuppressWarnings("WeakerAccess")
+ public static class InMemoryTransactionStateCache extends TransactionStateCache {
+ private static TransactionVisibilityState transactionSnapshot;
+
+ public static void setTransactionSnapshot(TransactionVisibilityState transactionSnapshot) {
+ InMemoryTransactionStateCache.transactionSnapshot = transactionSnapshot;
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ // Nothing to do
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ // Nothing to do
+ }
+
+ @Override
+ public TransactionVisibilityState getLatestState() {
+ return transactionSnapshot;
+ }
+ }
+}