You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2018/03/10 01:56:33 UTC
[4/6] incubator-tephra git commit: TEPHRA-272 Add HBase 2.0
compatibility module
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/db6ef6d2/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java b/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
new file mode 100644
index 0000000..42c9f84
--- /dev/null
+++ b/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableExistsException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.tephra.txprune.TransactionPruningPlugin;
+import org.apache.tephra.util.TxUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+/**
+ * Default implementation of the {@link TransactionPruningPlugin} for HBase.
+ *
+ * This plugin determines the prune upper bound for transactional HBase tables that use
+ * coprocessor {@link TransactionProcessor}.
+ *
+ * <h3>State storage:</h3>
+ *
+ * This plugin expects the TransactionProcessor to save the prune upper bound for invalid transactions
+ * after every major compaction of a region. Let's call this <i>(region, prune upper bound)</i>.
+ * In addition, the plugin also persists the following information on a run at time <i>t</i>
+ * <ul>
+ * <li>
+ * <i>(t, set of regions)</i>: Set of transactional regions at time <i>t</i>.
+ * Transactional regions are regions of the tables that have the coprocessor TransactionProcessor
+ * attached to them.
+ * </li>
+ * <li>
+ * <i>(t, inactive transaction bound)</i>: This is the smallest not in-progress transaction that
+ * will not have writes in any HBase regions that are created after time <i>t</i>.
+ * This value is determined by the Transaction Service based on the transaction state at time <i>t</i>
+ * and passed on to the plugin.
+ * </li>
+ * </ul>
+ *
+ * <h3>Computing prune upper bound:</h3>
+ *
+ * In a typical HBase instance, there can be a constant change in the number of regions due to region creations,
+ * splits and merges. At any given time there can always be a region on which a major compaction has not been run.
+ * Since the prune upper bound will get recorded for a region only after a major compaction,
+ * using only the latest set of regions we may not be able to find the
+ * prune upper bounds for all the current regions. Hence we persist the set of regions that exist at that time
+ * of each run of the plugin, and use historical region set for time <i>t</i>, <i>t - 1</i>, etc.
+ * to determine the prune upper bound.
+ *
+ * From the regions saved at time <i>t</i>, <i>t - 1</i>, etc.,
+ * the plugin tries to find the latest <i>(t, set of regions)</i> where all regions have been major compacted,
+ * i.e, all regions have prune upper bound recorded in <i>(region, prune upper bound)</i>.
+ * <br/>
+ * If such a set is found for time <i>t1</i>, the prune upper bound returned by the plugin is the minimum of
+ * <ul>
+ * <li>Prune upper bounds of regions in set <i>(t1, set of regions)</i></li>
+ * <li>Inactive transaction bound from <i>(t1, inactive transaction bound)</i></li>
+ * </ul>
+ *
+ * <p/>
+ * Above, when we find <i>(t1, set of regions)</i>, there may a region that was created after time <i>t1</i>,
+ * but has a data write from an invalid transaction that is smaller than the prune upper bounds of all
+ * regions in <i>(t1, set of regions)</i>. This is possible because <i>(region, prune upper bound)</i> persisted by
+ * TransactionProcessor is always the latest prune upper bound for a region.
+ * <br/>
+ * However a region created after time <i>t1</i> cannot have writes from an invalid transaction that is smaller than
+ * inactive transaction bound at the time the region was created.
+ * Since we limit the plugin prune upper bound using <i>(t1, inactive transaction bound)</i>,
+ * there should be no invalid transactions smaller than the plugin prune upper bound with writes in any
+ * transactional region of this HBase instance.
+ *
+ * <p/>
+ * Note: If your tables uses a transactional coprocessor other than TransactionProcessor,
+ * then you may need to write a new plugin to compute prune upper bound for those tables.
+ */
+@SuppressWarnings("WeakerAccess")
+public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
+ public static final Logger LOG = LoggerFactory.getLogger(HBaseTransactionPruningPlugin.class);
+
+ protected Configuration conf;
+ protected Connection connection;
+ protected DataJanitorState dataJanitorState;
+
+ @Override
+ public void initialize(Configuration conf) throws IOException {
+ this.conf = conf;
+ this.connection = ConnectionFactory.createConnection(conf);
+
+ final TableName stateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
+ LOG.info("Initializing plugin with state table {}", stateTable.getNameWithNamespaceInclAsString());
+ createPruneTable(stateTable);
+ this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
+ @Override
+ public Table get() throws IOException {
+ return connection.getTable(stateTable);
+ }
+ });
+ }
+
+ /**
+ * Determines prune upper bound for the data store as mentioned above.
+ */
+ @Override
+ public long fetchPruneUpperBound(long time, long inactiveTransactionBound) throws IOException {
+ LOG.debug("Fetching prune upper bound for time {} and inactive transaction bound {}",
+ time, inactiveTransactionBound);
+ if (time < 0 || inactiveTransactionBound < 0) {
+ return -1;
+ }
+
+ // Get all the current transactional regions
+ SortedSet<byte[]> transactionalRegions = getTransactionalRegions();
+ if (!transactionalRegions.isEmpty()) {
+ LOG.debug("Saving {} transactional regions for time {}", transactionalRegions.size(), time);
+ dataJanitorState.saveRegionsForTime(time, transactionalRegions);
+ // Save inactive transaction bound for time as the final step.
+ // We can then use its existence to make sure that the data for a given time is complete or not
+ LOG.debug("Saving inactive transaction bound {} for time {}", inactiveTransactionBound, time);
+ dataJanitorState.saveInactiveTransactionBoundForTime(time, inactiveTransactionBound);
+ }
+
+ return computePruneUpperBound(new TimeRegions(time, transactionalRegions));
+ }
+
+ /**
+ * After invalid list has been pruned, this cleans up state information that is no longer required.
+ * This includes -
+ * <ul>
+ * <li>
+ * <i>(region, prune upper bound)</i> - prune upper bound for regions that are older
+ * than maxPrunedInvalid
+ * </li>
+ * <li>
+ * <i>(t, set of regions) - Regions set that were recorded on or before the start time
+ * of maxPrunedInvalid
+ * </li>
+ * <li>
+ * (t, inactive transaction bound) - Smallest not in-progress transaction without any writes in new regions
+ * information recorded on or before the start time of maxPrunedInvalid
+ * </li>
+ * </ul>
+ */
+ @Override
+ public void pruneComplete(long time, long maxPrunedInvalid) throws IOException {
+ LOG.debug("Prune complete for time {} and prune upper bound {}", time, maxPrunedInvalid);
+ if (time < 0 || maxPrunedInvalid < 0) {
+ return;
+ }
+
+ // Get regions for the current time, so as to not delete the prune upper bounds for them.
+ // The prune upper bounds for regions are recorded by TransactionProcessor and the deletion
+ // is done by this class. To avoid update/delete race condition, we only delete prune upper
+ // bounds for the stale regions.
+ TimeRegions regionsToExclude = dataJanitorState.getRegionsOnOrBeforeTime(time);
+ if (regionsToExclude != null) {
+ LOG.debug("Deleting prune upper bounds smaller than {} for stale regions", maxPrunedInvalid);
+ dataJanitorState.deletePruneUpperBounds(maxPrunedInvalid, regionsToExclude.getRegions());
+ } else {
+ LOG.warn("Cannot find saved regions on or before time {}", time);
+ }
+ long pruneTime = TxUtils.getTimestamp(maxPrunedInvalid);
+ LOG.debug("Deleting regions recorded before time {}", pruneTime);
+ dataJanitorState.deleteAllRegionsOnOrBeforeTime(pruneTime);
+ LOG.debug("Deleting inactive transaction bounds recorded on or before time {}", pruneTime);
+ dataJanitorState.deleteInactiveTransactionBoundsOnOrBeforeTime(pruneTime);
+ LOG.debug("Deleting empty regions recorded on or before time {}", pruneTime);
+ dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(pruneTime);
+ }
+
+ @Override
+ public void destroy() {
+ LOG.info("Stopping plugin...");
+ try {
+ connection.close();
+ } catch (IOException e) {
+ LOG.error("Got exception while closing HBase connection", e);
+ }
+ }
+
+ /**
+ * Create the prune state table given the {@link TableName} if the table doesn't exist already.
+ *
+ * @param stateTable prune state table name
+ */
+ protected void createPruneTable(TableName stateTable) throws IOException {
+ try (Admin admin = this.connection.getAdmin()) {
+ if (admin.tableExists(stateTable)) {
+ LOG.debug("Not creating pruneStateTable {} since it already exists.",
+ stateTable.getNameWithNamespaceInclAsString());
+ return;
+ }
+
+ HTableDescriptor htd = new HTableDescriptor(stateTable);
+ htd.addFamily(new HColumnDescriptor(DataJanitorState.FAMILY).setMaxVersions(1));
+ admin.createTable(htd);
+ LOG.info("Created pruneTable {}", stateTable.getNameWithNamespaceInclAsString());
+ } catch (TableExistsException ex) {
+ // Expected if the prune state table is being created at the same time by another client
+ LOG.debug("Not creating pruneStateTable {} since it already exists.",
+ stateTable.getNameWithNamespaceInclAsString(), ex);
+ }
+ }
+
+ /**
+ * Returns whether the table is a transactional table. By default, it is a table is identified as a transactional
+ * table if it has a the coprocessor {@link TransactionProcessor} attached to it. Should be overriden if the users
+ * attach a different coprocessor.
+ *
+ * @param tableDescriptor {@link HTableDescriptor} of the table
+ * @return true if the table is transactional
+ */
+ protected boolean isTransactionalTable(HTableDescriptor tableDescriptor) {
+ return tableDescriptor.hasCoprocessor(TransactionProcessor.class.getName());
+ }
+
+ protected SortedSet<byte[]> getTransactionalRegions() throws IOException {
+ SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+ try (Admin admin = connection.getAdmin()) {
+ HTableDescriptor[] tableDescriptors = admin.listTables();
+ LOG.debug("Got {} tables to process", tableDescriptors == null ? 0 : tableDescriptors.length);
+ if (tableDescriptors != null) {
+ for (HTableDescriptor tableDescriptor : tableDescriptors) {
+ if (isTransactionalTable(tableDescriptor)) {
+ List<HRegionInfo> tableRegions = admin.getTableRegions(tableDescriptor.getTableName());
+ LOG.debug("Regions for table {}: {}", tableDescriptor.getTableName(), tableRegions);
+ if (tableRegions != null) {
+ for (HRegionInfo region : tableRegions) {
+ regions.add(region.getRegionName());
+ }
+ }
+ } else {
+ LOG.debug("{} is not a transactional table", tableDescriptor.getTableName());
+ }
+ }
+ }
+ }
+ return regions;
+ }
+
+ /**
+ * Try to find the latest set of regions in which all regions have been major compacted, and
+ * compute prune upper bound from them. Starting from newest to oldest, this looks into the
+ * region set that has been saved periodically, and joins it with the prune upper bound data
+ * for a region recorded after a major compaction.
+ *
+ * @param timeRegions the latest set of regions
+ * @return prune upper bound
+ * @throws IOException when not able to talk to HBase
+ */
+ private long computePruneUpperBound(TimeRegions timeRegions) throws IOException {
+ // Get the tables for the current time from the latest regions set
+ final Set<TableName> existingTables = getTableNamesForRegions(timeRegions.getRegions());
+ LOG.debug("Tables for time {} = {}", timeRegions.getTime(), existingTables);
+
+ do {
+ LOG.debug("Computing prune upper bound for {}", timeRegions);
+ SortedSet<byte[]> transactionalRegions = timeRegions.getRegions();
+ long time = timeRegions.getTime();
+
+ long inactiveTransactionBound = dataJanitorState.getInactiveTransactionBoundForTime(time);
+ LOG.debug("Got inactive transaction bound {}", inactiveTransactionBound);
+ // If inactiveTransactionBound is not recorded then that means the data is not complete for these regions
+ if (inactiveTransactionBound == -1) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ignoring regions for time {} as no inactiveTransactionBound was found for that time, " +
+ "and hence the data must be incomplete", time);
+ }
+ continue;
+ }
+
+ // Remove non-existing tables from the transactional regions set, so that we don't lookup prune upper bounds
+ // for them. Since the deleted tables do not exist anymore, there is no need to make sure they have been
+ // compacted. This ensures that transient tables do not block pruning progress.
+ transactionalRegions = filterDeletedTableRegions(existingTables, transactionalRegions);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Transactional regions after removing the regions of non-existing tables = {}",
+ Iterables.transform(transactionalRegions, TimeRegions.BYTE_ARR_TO_STRING_FN));
+ }
+
+ // Get the prune upper bounds for all the transactional regions
+ Map<byte[], Long> pruneUpperBoundRegions =
+ dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions);
+ logPruneUpperBoundRegions(pruneUpperBoundRegions);
+
+ // Use inactiveTransactionBound as the prune upper bound for the empty regions since the regions that are
+ // recorded as empty after inactiveTransactionBoundTime will not have invalid data
+ // for transactions started on or before inactiveTransactionBoundTime
+ pruneUpperBoundRegions = handleEmptyRegions(inactiveTransactionBound, transactionalRegions,
+ pruneUpperBoundRegions);
+
+ // If prune upper bounds are found for all the transactional regions, then compute the prune upper bound
+ // across all regions
+ if (!transactionalRegions.isEmpty() &&
+ pruneUpperBoundRegions.size() == transactionalRegions.size()) {
+ Long minPruneUpperBoundRegions = Collections.min(pruneUpperBoundRegions.values());
+ long pruneUpperBound = Math.min(inactiveTransactionBound, minPruneUpperBoundRegions);
+ LOG.debug("Found prune upper bound {} for time {}", pruneUpperBound, time);
+ return pruneUpperBound;
+ } else {
+ if (LOG.isDebugEnabled()) {
+ Sets.SetView<byte[]> difference =
+ Sets.difference(transactionalRegions, pruneUpperBoundRegions.keySet());
+ LOG.debug("Ignoring regions for time {} because the following regions did not record a pruneUpperBound: {}",
+ time, Iterables.transform(difference, TimeRegions.BYTE_ARR_TO_STRING_FN));
+ }
+ }
+
+ timeRegions = dataJanitorState.getRegionsOnOrBeforeTime(time - 1);
+ } while (timeRegions != null);
+ return -1;
+ }
+
+ private SortedSet<byte[]> filterDeletedTableRegions(final Set<TableName> existingTables,
+ SortedSet<byte[]> transactionalRegions) {
+ return Sets.filter(transactionalRegions,
+ new Predicate<byte[]>() {
+ @Override
+ public boolean apply(byte[] region) {
+ return existingTables.contains(HRegionInfo.getTable(region));
+ }
+ });
+ }
+
+ private Set<TableName> getTableNamesForRegions(Set<byte[]> regions) {
+ Set<TableName> tableNames = new HashSet<>(regions.size());
+ for (byte[] region : regions) {
+ tableNames.add(HRegionInfo.getTable(region));
+ }
+ return tableNames;
+ }
+
+ private Map<byte[], Long> handleEmptyRegions(long inactiveTransactionBound,
+ SortedSet<byte[]> transactionalRegions,
+ Map<byte[], Long> pruneUpperBoundRegions) throws IOException {
+ long inactiveTransactionBoundTime = TxUtils.getTimestamp(inactiveTransactionBound);
+ SortedSet<byte[]> emptyRegions =
+ dataJanitorState.getEmptyRegionsAfterTime(inactiveTransactionBoundTime, transactionalRegions);
+ LOG.debug("Got empty transactional regions for inactive transaction bound time {}: {}",
+ inactiveTransactionBoundTime, Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN));
+
+ // The regions that are recorded as empty after inactiveTransactionBoundTime will not have invalid data
+ // for transactions started before or on inactiveTransactionBoundTime. Hence we can consider the prune upper bound
+ // for these empty regions as inactiveTransactionBound
+ Map<byte[], Long> pubWithEmptyRegions = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ pubWithEmptyRegions.putAll(pruneUpperBoundRegions);
+ for (byte[] emptyRegion : emptyRegions) {
+ if (!pruneUpperBoundRegions.containsKey(emptyRegion)) {
+ pubWithEmptyRegions.put(emptyRegion, inactiveTransactionBound);
+ }
+ }
+ return Collections.unmodifiableMap(pubWithEmptyRegions);
+ }
+
+ private void logPruneUpperBoundRegions(Map<byte[], Long> pruneUpperBoundRegions) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got region - prune upper bound map: {}",
+ Iterables.transform(pruneUpperBoundRegions.entrySet(),
+ new Function<Map.Entry<byte[], Long>, Map.Entry<String, Long>>() {
+ @Override
+ public Map.Entry<String, Long> apply(Map.Entry<byte[], Long> input) {
+ String regionName = TimeRegions.BYTE_ARR_TO_STRING_FN.apply(input.getKey());
+ return Maps.immutableEntry(regionName, input.getValue());
+ }
+ }));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/db6ef6d2/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTool.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTool.java b/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTool.java
new file mode 100644
index 0000000..5d7b871
--- /dev/null
+++ b/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTool.java
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.MinMaxPriorityQueue;
+import com.google.common.collect.Sets;
+import com.google.gson.Gson;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.txprune.RegionPruneInfo;
+import org.apache.tephra.txprune.hbase.InvalidListPruningDebug;
+import org.apache.tephra.txprune.hbase.RegionsAtTime;
+import org.apache.tephra.util.TimeMathParser;
+import org.apache.tephra.util.TxUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+
+/**
+ * Invalid List Pruning Debug Tool.
+ */
+public class InvalidListPruningDebugTool implements InvalidListPruningDebug {
+ private static final Logger LOG = LoggerFactory.getLogger(InvalidListPruningDebugTool.class);
+ private static final Gson GSON = new Gson();
+ private static final String NOW = "now";
+ @VisibleForTesting
+ static final String DATE_FORMAT = "d-MMM-yyyy HH:mm:ss z";
+
+ private DataJanitorState dataJanitorState;
+ private Connection connection;
+ private TableName tableName;
+
+ /**
+ * Initialize the Invalid List Debug Tool.
+ * @param conf {@link Configuration}
+ * @throws IOException when not able to create an HBase connection
+ */
+ @Override
+ @SuppressWarnings("WeakerAccess")
+ public void initialize(final Configuration conf) throws IOException {
+ LOG.debug("InvalidListPruningDebugMain : initialize method called");
+ connection = ConnectionFactory.createConnection(conf);
+ tableName = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
+ dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
+ @Override
+ public Table get() throws IOException {
+ return connection.getTable(tableName);
+ }
+ });
+ }
+
+ @Override
+ @SuppressWarnings("WeakerAccess")
+ public void destroy() throws IOException {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+
+ /**
+ * Returns a set of regions that are live but are not empty nor have a prune upper bound recorded. These regions
+ * will stop the progress of pruning.
+ * <p/>
+ * Note that this can return false positives in the following case -
+ * At time 't' empty regions were recorded, and time 't+1' prune iteration was invoked.
+ * Since a new set of regions was recorded at time 't+1', all regions recorded as empty before time 't + 1' will
+ * now be reported as blocking the pruning, even though they are empty. This is because we cannot tell if those
+ * regions got any new data between time 't' and 't + 1'.
+ *
+ * @param numRegions number of regions
+ * @param time time in milliseconds or relative time, regions recorded before the given time are returned
+ * @return {@link Set} of regions that needs to be compacted and flushed
+ */
+ @Override
+ @SuppressWarnings("WeakerAccess")
+ public Set<String> getRegionsToBeCompacted(Integer numRegions, String time) throws IOException {
+ // Fetch the live regions at the given time
+ RegionsAtTime timeRegion = getRegionsOnOrBeforeTime(time);
+ if (timeRegion.getRegions().isEmpty()) {
+ return Collections.emptySet();
+ }
+
+ Long timestamp = timeRegion.getTime();
+ SortedSet<String> regions = timeRegion.getRegions();
+
+ // Get the live regions
+ SortedSet<String> liveRegions = getRegionsOnOrBeforeTime(NOW).getRegions();
+ // Retain only the live regions
+ regions = Sets.newTreeSet(Sets.intersection(liveRegions, regions));
+
+ SortedSet<byte[]> emptyRegions = dataJanitorState.getEmptyRegionsAfterTime(timestamp, null);
+ SortedSet<String> emptyRegionNames = new TreeSet<>();
+ Iterable<String> regionStrings = Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN);
+ for (String regionString : regionStrings) {
+ emptyRegionNames.add(regionString);
+ }
+
+ Set<String> nonEmptyRegions = Sets.newHashSet(Sets.difference(regions, emptyRegionNames));
+
+ // Get all pruned regions for the current time and remove them from the nonEmptyRegions,
+ // resulting in a set of regions that are not empty and have not been registered prune upper bound
+ List<RegionPruneInfo> prunedRegions = dataJanitorState.getPruneInfoForRegions(null);
+ for (RegionPruneInfo prunedRegion : prunedRegions) {
+ if (nonEmptyRegions.contains(prunedRegion.getRegionNameAsString())) {
+ nonEmptyRegions.remove(prunedRegion.getRegionNameAsString());
+ }
+ }
+
+ if ((numRegions < 0) || (numRegions >= nonEmptyRegions.size())) {
+ return nonEmptyRegions;
+ }
+
+ Set<String> subsetRegions = new HashSet<>(numRegions);
+ for (String regionName : nonEmptyRegions) {
+ if (subsetRegions.size() == numRegions) {
+ break;
+ }
+ subsetRegions.add(regionName);
+ }
+ return subsetRegions;
+ }
+
+ /**
+ * Return a list of RegionPruneInfo. These regions are the ones that have the lowest prune upper bounds.
+ * If -1 is passed in, all the regions and their prune upper bound will be returned. Note that only the regions
+ * that are known to be live will be returned.
+ *
+ * @param numRegions number of regions
+ * @param time time in milliseconds or relative time, regions recorded before the given time are returned
+ * @return Map of region name and its prune upper bound
+ */
+ @Override
+ @SuppressWarnings("WeakerAccess")
+ public SortedSet<RegionPruneInfoPretty> getIdleRegions(Integer numRegions, String time) throws IOException {
+ List<RegionPruneInfo> regionPruneInfos = dataJanitorState.getPruneInfoForRegions(null);
+ if (regionPruneInfos.isEmpty()) {
+ return new TreeSet<>();
+ }
+
+ // Create a set with region names
+ Set<String> pruneRegionNameSet = new HashSet<>();
+ for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
+ pruneRegionNameSet.add(regionPruneInfo.getRegionNameAsString());
+ }
+
+ // Fetch the latest live regions
+ RegionsAtTime latestRegions = getRegionsOnOrBeforeTime(NOW);
+
+ // Fetch the regions at the given time
+ RegionsAtTime timeRegions = getRegionsOnOrBeforeTime(time);
+ Set<String> liveRegions = Sets.intersection(latestRegions.getRegions(), timeRegions.getRegions());
+ Set<String> liveRegionsWithPruneInfo = Sets.intersection(liveRegions, pruneRegionNameSet);
+ List<RegionPruneInfo> liveRegionWithPruneInfoList = new ArrayList<>();
+ for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
+ if (liveRegionsWithPruneInfo.contains(regionPruneInfo.getRegionNameAsString())) {
+ liveRegionWithPruneInfoList.add(regionPruneInfo);
+ }
+
+ // Use the subset of live regions and prune regions
+ regionPruneInfos = liveRegionWithPruneInfoList;
+ }
+
+ if (numRegions < 0) {
+ numRegions = regionPruneInfos.size();
+ }
+
+ Comparator<RegionPruneInfo> comparator = new Comparator<RegionPruneInfo>() {
+ @Override
+ public int compare(RegionPruneInfo o1, RegionPruneInfo o2) {
+ int result = Long.compare(o1.getPruneUpperBound(), o2.getPruneUpperBound());
+ if (result == 0) {
+ return o1.getRegionNameAsString().compareTo(o2.getRegionNameAsString());
+ }
+ return result;
+ }
+ };
+ MinMaxPriorityQueue<RegionPruneInfoPretty> lowestPrunes =
+ MinMaxPriorityQueue.orderedBy(comparator).maximumSize(numRegions).create();
+
+ for (RegionPruneInfo pruneInfo : regionPruneInfos) {
+ lowestPrunes.add(new RegionPruneInfoPretty(pruneInfo));
+ }
+
+ SortedSet<RegionPruneInfoPretty> regions = new TreeSet<>(comparator);
+ regions.addAll(lowestPrunes);
+ return regions;
+ }
+
+ /**
+ * Return the prune upper bound value of a given region. If no prune upper bound has been written for this region yet,
+ * it will return a null.
+ *
+ * @param regionId region id
+ * @return {@link RegionPruneInfo} of the region
+ * @throws IOException if there are any errors while trying to fetch the {@link RegionPruneInfo}
+ */
+ @Override
+ @SuppressWarnings("WeakerAccess")
+ @Nullable
+ public RegionPruneInfoPretty getRegionPruneInfo(String regionId) throws IOException {
+ RegionPruneInfo pruneInfo = dataJanitorState.getPruneInfoForRegion(Bytes.toBytesBinary(regionId));
+ return pruneInfo == null ? null : new RegionPruneInfoPretty(pruneInfo);
+ }
+
+ /**
+ *
+ * @param timeString Given a time, provide the {@link TimeRegions} at or before that time.
+ * Time can be in milliseconds or relative time.
+ * @return transactional regions that are present at or before the given time
+ * @throws IOException if there are any errors while trying to fetch the {@link TimeRegions}
+ */
+ @Override
+ @SuppressWarnings("WeakerAccess")
+ public RegionsAtTime getRegionsOnOrBeforeTime(String timeString) throws IOException {
+ long time = TimeMathParser.parseTime(timeString, TimeUnit.MILLISECONDS);
+ SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+ TimeRegions timeRegions = dataJanitorState.getRegionsOnOrBeforeTime(time);
+ if (timeRegions == null) {
+ return new RegionsAtTime(time, new TreeSet<String>(), dateFormat);
+ }
+ SortedSet<String> regionNames = new TreeSet<>();
+ Iterable<String> regionStrings = Iterables.transform(timeRegions.getRegions(), TimeRegions.BYTE_ARR_TO_STRING_FN);
+ for (String regionString : regionStrings) {
+ regionNames.add(regionString);
+ }
+ return new RegionsAtTime(timeRegions.getTime(), regionNames, dateFormat);
+ }
+
+ private void printUsage(PrintWriter pw) {
+ pw.println();
+ pw.println("Usage : org.apache.tephra.hbase.txprune.InvalidListPruning <command> <parameters>");
+ pw.println();
+ pw.println("Available commands");
+ pw.println("------------------");
+ pw.println("to-compact-regions limit [time]");
+ pw.println("Desc: Prints out the regions that are active, but not empty, " +
+ "and have not registered a prune upper bound.");
+ pw.println();
+ pw.println("idle-regions limit [time]");
+ pw.println("Desc: Prints out the regions that have the lowest prune upper bounds.");
+ pw.println();
+ pw.println("prune-info region-name-as-string");
+ pw.println("Desc: Prints the prune upper bound and the time it was recorded for the given region.");
+ pw.println();
+ pw.println("time-region [time]");
+ pw.println("Desc: Prints out the transactional regions present in HBase recorded at or before the given time.");
+ pw.println();
+ pw.println("Parameters");
+ pw.println("----------");
+ pw.println(" * limit - used to limit the number of regions returned, -1 to apply no limit");
+ pw.println(" * time - if time is not provided, the current time is used. ");
+ pw.println(" When provided, the data recorded on or before the given time is returned.");
+ pw.println(" Time can be provided in milliseconds, or can be provided as a relative time.");
+ pw.println(" Examples for relative time -");
+ pw.println(" now = current time,");
+ pw.println(" now-1d = current time - 1 day,");
+ pw.println(" now-1d+4h = 20 hours before now,");
+ pw.println(" now+5s = current time + 5 seconds");
+ pw.println();
+ }
+
+ @VisibleForTesting
+ boolean execute(String[] args, PrintWriter out) throws IOException {
+ if (args.length < 1) {
+ printUsage(out);
+ return false;
+ }
+
+ String command = args[0];
+ switch (command) {
+ case "time-region":
+ if (args.length <= 2) {
+ String time = args.length == 2 ? args[1] : NOW;
+ RegionsAtTime timeRegion = getRegionsOnOrBeforeTime(time);
+ out.println(GSON.toJson(timeRegion));
+ return true;
+ }
+ break;
+ case "idle-regions":
+ if (args.length <= 3) {
+ Integer numRegions = Integer.parseInt(args[1]);
+ String time = args.length == 3 ? args[2] : NOW;
+ SortedSet<RegionPruneInfoPretty> regionPruneInfos = getIdleRegions(numRegions, time);
+ out.println(GSON.toJson(regionPruneInfos));
+ return true;
+ }
+ break;
+ case "prune-info":
+ if (args.length == 2) {
+ String regionName = args[1];
+ RegionPruneInfo regionPruneInfo = getRegionPruneInfo(regionName);
+ if (regionPruneInfo != null) {
+ out.println(GSON.toJson(regionPruneInfo));
+ } else {
+ out.println(String.format("No prune info found for the region %s.", regionName));
+ }
+ return true;
+ }
+ break;
+ case "to-compact-regions":
+ if (args.length <= 3) {
+ Integer numRegions = Integer.parseInt(args[1]);
+ String time = args.length == 3 ? args[2] : NOW;
+ Set<String> toBeCompactedRegions = getRegionsToBeCompacted(numRegions, time);
+ out.println(GSON.toJson(toBeCompactedRegions));
+ return true;
+ }
+ break;
+ }
+
+ printUsage(out);
+ return false;
+ }
+
+ public static void main(String[] args) {
+ Configuration hConf = HBaseConfiguration.create();
+ InvalidListPruningDebugTool pruningDebug = new InvalidListPruningDebugTool();
+ try (PrintWriter out = new PrintWriter(System.out)) {
+ pruningDebug.initialize(hConf);
+ boolean success = pruningDebug.execute(args, out);
+ pruningDebug.destroy();
+ if (!success) {
+ System.exit(1);
+ }
+ } catch (IOException ex) {
+ LOG.error("Received an exception while trying to execute the debug tool. ", ex);
+ }
+ }
+
+ /**
+ * Wrapper class around {@link RegionPruneInfo} to print human readable dates for timestamps.
+ */
+ @SuppressWarnings({"WeakerAccess", "unused"})
+ public static class RegionPruneInfoPretty extends RegionPruneInfo {
+ private final transient SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+ private final String pruneUpperBoundAsString;
+ private final String pruneRecordTimeAsString;
+
+ public RegionPruneInfoPretty(RegionPruneInfo regionPruneInfo) {
+ this(regionPruneInfo.getRegionName(), regionPruneInfo.getRegionNameAsString(),
+ regionPruneInfo.getPruneUpperBound(), regionPruneInfo.getPruneRecordTime());
+ }
+
+ public RegionPruneInfoPretty(byte[] regionName, String regionNameAsString,
+ long pruneUpperBound, long pruneRecordTime) {
+ super(regionName, regionNameAsString, pruneUpperBound, pruneRecordTime);
+ pruneUpperBoundAsString = dateFormat.format(TxUtils.getTimestamp(pruneUpperBound));
+ pruneRecordTimeAsString = dateFormat.format(pruneRecordTime);
+ }
+
+ public String getPruneUpperBoundAsString() {
+ return pruneUpperBoundAsString;
+ }
+
+ public String getPruneRecordTimeAsString() {
+ return pruneRecordTimeAsString;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ RegionPruneInfoPretty that = (RegionPruneInfoPretty) o;
+ return Objects.equals(pruneUpperBoundAsString, that.pruneUpperBoundAsString) &&
+ Objects.equals(pruneRecordTimeAsString, that.pruneRecordTimeAsString);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), pruneUpperBoundAsString, pruneRecordTimeAsString);
+ }
+
+ @Override
+ public String toString() {
+ return "RegionPruneInfoPretty{" +
+ ", pruneUpperBoundAsString='" + pruneUpperBoundAsString + '\'' +
+ ", pruneRecordTimeAsString='" + pruneRecordTimeAsString + '\'' +
+ "} " + super.toString();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/db6ef6d2/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
new file mode 100644
index 0000000..677710b
--- /dev/null
+++ b/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import com.google.common.util.concurrent.AbstractIdleService;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Thread that will write the the prune upper bound. An instance of this class should be obtained only
+ * through {@link PruneUpperBoundWriterSupplier} which will also handle the lifecycle of this instance.
+ */
+public class PruneUpperBoundWriter extends AbstractIdleService {
+ private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriter.class);
+
+ private final TableName tableName;
+ private final DataJanitorState dataJanitorState;
+ private final long pruneFlushInterval;
+ // Map of region name -> prune upper bound
+ private final ConcurrentSkipListMap<byte[], Long> pruneEntries;
+ // Map of region name -> time the region was found to be empty
+ private final ConcurrentSkipListMap<byte[], Long> emptyRegions;
+
+ private volatile Thread flushThread;
+ private volatile boolean stopped;
+
+ private long lastChecked;
+
+ @SuppressWarnings("WeakerAccess")
+ public PruneUpperBoundWriter(TableName tableName, DataJanitorState dataJanitorState, long pruneFlushInterval) {
+ this.tableName = tableName;
+ this.dataJanitorState = dataJanitorState;
+ this.pruneFlushInterval = pruneFlushInterval;
+ this.pruneEntries = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
+ this.emptyRegions = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
+ }
+
+ @SuppressWarnings("WeakerAccess")
+ public void persistPruneEntry(byte[] regionName, long pruneUpperBound) {
+ warnIfNotRunning(regionName);
+ // The number of entries in this map is bound by the number of regions in this region server and thus it will not
+ // grow indefinitely
+ pruneEntries.put(regionName, pruneUpperBound);
+ }
+
+ @SuppressWarnings("WeakerAccess")
+ public void persistRegionEmpty(byte[] regionName, long time) {
+ warnIfNotRunning(regionName);
+ // The number of entries in this map is bound by the number of regions in this region server and thus it will not
+ // grow indefinitely
+ emptyRegions.put(regionName, time);
+ }
+
+ @SuppressWarnings("WeakerAccess")
+ public boolean isAlive() {
+ return flushThread != null && flushThread.isAlive();
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ LOG.info("Starting PruneUpperBoundWriter Thread.");
+ startFlushThread();
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ LOG.info("Stopping PruneUpperBoundWriter Thread.");
+ stopped = true;
+ if (flushThread != null) {
+ flushThread.interrupt();
+ flushThread.join(TimeUnit.SECONDS.toMillis(1));
+ if (flushThread.isAlive()) {
+ flushThread.interrupt();
+ flushThread.join(TimeUnit.SECONDS.toMillis(1));
+ }
+ }
+ }
+
+ private void startFlushThread() {
+ flushThread = new Thread("tephra-prune-upper-bound-writer") {
+ @Override
+ public void run() {
+ while ((!isInterrupted()) && (!stopped)) {
+ long now = System.currentTimeMillis();
+ if (now > (lastChecked + pruneFlushInterval)) {
+ // should flush data
+ try {
+ User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ // Record prune upper bound
+ while (!pruneEntries.isEmpty()) {
+ Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
+ dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
+ // We can now remove the entry only if the key and value match with what we wrote since it is
+ // possible that a new pruneUpperBound for the same key has been added
+ pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
+ }
+ // Record empty regions
+ while (!emptyRegions.isEmpty()) {
+ Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry();
+ dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey());
+ // We can now remove the entry only if the key and value match with what we wrote since it is
+ // possible that a new value for the same key has been added
+ emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue());
+ }
+ return null;
+ }
+ });
+ } catch (IOException ex) {
+ LOG.warn("Cannot record prune upper bound for a region to table " +
+ tableName.getNameWithNamespaceInclAsString(), ex);
+ }
+ lastChecked = now;
+ }
+
+ try {
+ TimeUnit.SECONDS.sleep(1);
+ } catch (InterruptedException ex) {
+ interrupt();
+ break;
+ }
+ }
+
+ LOG.info("PruneUpperBound Writer thread terminated.");
+ }
+ };
+
+ flushThread.setDaemon(true);
+ flushThread.start();
+ }
+
+ private void warnIfNotRunning(byte[] regionName) {
+ if (!isRunning() || !isAlive()) {
+ LOG.warn(String.format("Trying to persist prune upper bound for region %s when writer is not %s!",
+ Bytes.toStringBinary(regionName), isRunning() ? "alive" : "running"));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/db6ef6d2/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java b/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
new file mode 100644
index 0000000..cb93fab
--- /dev/null
+++ b/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+
+import com.google.common.base.Supplier;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.tephra.coprocessor.CacheSupplier;
+import org.apache.tephra.coprocessor.ReferenceCountedSupplier;
+
+/**
+ * Supplies instances of {@link PruneUpperBoundWriter} implementations.
+ */
+public class PruneUpperBoundWriterSupplier implements CacheSupplier<PruneUpperBoundWriter> {
+
+ private static final ReferenceCountedSupplier<PruneUpperBoundWriter> referenceCountedSupplier =
+ new ReferenceCountedSupplier<>(PruneUpperBoundWriter.class.getSimpleName());
+
+ private final Supplier<PruneUpperBoundWriter> supplier;
+
+ public PruneUpperBoundWriterSupplier(final TableName tableName, final DataJanitorState dataJanitorState,
+ final long pruneFlushInterval) {
+ this.supplier = new Supplier<PruneUpperBoundWriter>() {
+ @Override
+ public PruneUpperBoundWriter get() {
+ return new PruneUpperBoundWriter(tableName, dataJanitorState, pruneFlushInterval);
+ }
+ };
+ }
+
+ @Override
+ public PruneUpperBoundWriter get() {
+ return referenceCountedSupplier.getOrCreate(supplier);
+ }
+
+ public void release() {
+ referenceCountedSupplier.release();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/db6ef6d2/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/txprune/TimeRegions.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/txprune/TimeRegions.java b/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/txprune/TimeRegions.java
new file mode 100644
index 0000000..4ac8887
--- /dev/null
+++ b/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/txprune/TimeRegions.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.util.Objects;
+import java.util.SortedSet;
+
+/**
+ * Contains information on the set of transactional regions recorded at a given time
+ */
+@SuppressWarnings("WeakerAccess")
+public class TimeRegions {
+ static final Function<byte[], String> BYTE_ARR_TO_STRING_FN =
+ new Function<byte[], String>() {
+ @Override
+ public String apply(byte[] input) {
+ return Bytes.toStringBinary(input);
+ }
+ };
+
+ private final long time;
+ private final SortedSet<byte[]> regions;
+
+ public TimeRegions(long time, SortedSet<byte[]> regions) {
+ this.time = time;
+ this.regions = regions;
+ }
+
+ public long getTime() {
+ return time;
+ }
+
+ public SortedSet<byte[]> getRegions() {
+ return regions;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TimeRegions that = (TimeRegions) o;
+ return time == that.time &&
+ Objects.equals(regions, that.regions);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(time, regions);
+ }
+
+ @Override
+ public String toString() {
+ Iterable<String> regionStrings = Iterables.transform(regions, BYTE_ARR_TO_STRING_FN);
+ return "TimeRegions{" +
+ "time=" + time +
+ ", regions=[" + Joiner.on(" ").join(regionStrings) + "]" +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/db6ef6d2/tephra-hbase-compat-2.0/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-2.0/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java b/tephra-hbase-compat-2.0/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java
new file mode 100644
index 0000000..179b22e
--- /dev/null
+++ b/tephra-hbase-compat-2.0/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Base class for tests that need a HBase cluster
+ */
+@SuppressWarnings("WeakerAccess")
+public abstract class AbstractHBaseTableTest {
+ protected static HBaseTestingUtility testUtil;
+ protected static HBaseAdmin hBaseAdmin;
+ protected static Configuration conf;
+
+ @BeforeClass
+ public static void startMiniCluster() throws Exception {
+ testUtil = conf == null ? new HBaseTestingUtility() : new HBaseTestingUtility(conf);
+ conf = testUtil.getConfiguration();
+
+ // Tune down the connection thread pool size
+ conf.setInt("hbase.hconnection.threads.core", 5);
+ conf.setInt("hbase.hconnection.threads.max", 10);
+ // Tunn down handler threads in regionserver
+ conf.setInt("hbase.regionserver.handler.count", 10);
+
+ // Set to random port
+ conf.setInt("hbase.master.port", 0);
+ conf.setInt("hbase.master.info.port", 0);
+ conf.setInt("hbase.regionserver.port", 0);
+ conf.setInt("hbase.regionserver.info.port", 0);
+ testUtil.startMiniCluster();
+ hBaseAdmin = testUtil.getHBaseAdmin();
+ }
+
+ @AfterClass
+ public static void shutdownMiniCluster() throws Exception {
+ try {
+ if (hBaseAdmin != null) {
+ hBaseAdmin.close();
+ }
+ } finally {
+ testUtil.shutdownMiniCluster();
+ }
+ }
+
+ protected static Table createTable(byte[] tableName, byte[][] columnFamilies) throws Exception {
+ return createTable(tableName, columnFamilies, false,
+ Collections.singletonList(TransactionProcessor.class.getName()));
+ }
+
+ protected static Table createTable(byte[] tableName, byte[][] columnFamilies, boolean existingData,
+ List<String> coprocessors) throws Exception {
+ HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
+ for (byte[] family : columnFamilies) {
+ HColumnDescriptor columnDesc = new HColumnDescriptor(family);
+ columnDesc.setMaxVersions(Integer.MAX_VALUE);
+ columnDesc.setValue(TxConstants.PROPERTY_TTL, String.valueOf(100000)); // in millis
+ desc.addFamily(columnDesc);
+ }
+ if (existingData) {
+ desc.setValue(TxConstants.READ_NON_TX_DATA, "true");
+ }
+ // Divide individually to prevent any overflow
+ int priority = Coprocessor.PRIORITY_USER;
+ // order in list is the same order that coprocessors will be invoked
+ for (String coprocessor : coprocessors) {
+ desc.addCoprocessor(coprocessor, null, ++priority, null);
+ }
+ hBaseAdmin.createTable(desc);
+ testUtil.waitTableAvailable(tableName, 5000);
+ return testUtil.getConnection().getTable(TableName.valueOf(tableName));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/db6ef6d2/tephra-hbase-compat-2.0/src/test/java/org/apache/tephra/hbase/HBase20ConfigurationProviderTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-2.0/src/test/java/org/apache/tephra/hbase/HBase20ConfigurationProviderTest.java b/tephra-hbase-compat-2.0/src/test/java/org/apache/tephra/hbase/HBase20ConfigurationProviderTest.java
new file mode 100644
index 0000000..7e7b3c0
--- /dev/null
+++ b/tephra-hbase-compat-2.0/src/test/java/org/apache/tephra/hbase/HBase20ConfigurationProviderTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.tephra.util.AbstractConfigurationProviderTest;
+import org.apache.tephra.util.HBaseVersion;
+
+import java.util.Collection;
+
+/**
+ * Test for HBase 2.0 version specific behavior.
+ */
+public class HBase20ConfigurationProviderTest extends AbstractConfigurationProviderTest {
+ @Override
+ protected Collection<HBaseVersion.Version> getExpectedVersions() {
+ return ImmutableList.of(HBaseVersion.Version.HBASE_20);
+ }
+}