You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2018/03/10 01:56:33 UTC

[4/6] incubator-tephra git commit: TEPHRA-272 Add HBase 2.0 compatibility module

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/db6ef6d2/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java b/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
new file mode 100644
index 0000000..42c9f84
--- /dev/null
+++ b/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableExistsException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.tephra.txprune.TransactionPruningPlugin;
+import org.apache.tephra.util.TxUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+/**
+ * Default implementation of the {@link TransactionPruningPlugin} for HBase.
+ *
+ * This plugin determines the prune upper bound for transactional HBase tables that use
+ * coprocessor {@link TransactionProcessor}.
+ *
+ * <h3>State storage:</h3>
+ *
+ * This plugin expects the TransactionProcessor to save the prune upper bound for invalid transactions
+ * after every major compaction of a region. Let's call this <i>(region, prune upper bound)</i>.
+ * In addition, the plugin also persists the following information on a run at time <i>t</i>
+ * <ul>
+ *   <li>
+ *     <i>(t, set of regions)</i>: Set of transactional regions at time <i>t</i>.
+ *     Transactional regions are regions of the tables that have the coprocessor TransactionProcessor
+ *     attached to them.
+ *   </li>
+ *   <li>
+ *     <i>(t, inactive transaction bound)</i>: This is the smallest not in-progress transaction that
+ *     will not have writes in any HBase regions that are created after time <i>t</i>.
+ *     This value is determined by the Transaction Service based on the transaction state at time <i>t</i>
+ *     and passed on to the plugin.
+ *   </li>
+ * </ul>
+ *
+ * <h3>Computing prune upper bound:</h3>
+ *
+ * In a typical HBase instance, there can be a constant change in the number of regions due to region creations,
+ * splits and merges. At any given time there can always be a region on which a major compaction has not been run.
+ * Since the prune upper bound will get recorded for a region only after a major compaction,
+ * using only the latest set of regions we may not be able to find the
+ * prune upper bounds for all the current regions. Hence we persist the set of regions that exist at that time
+ * of each run of the plugin, and use historical region set for time <i>t</i>, <i>t - 1</i>, etc.
+ * to determine the prune upper bound.
+ *
+ * From the regions saved at time <i>t</i>, <i>t - 1</i>, etc.,
+ * the plugin tries to find the latest <i>(t, set of regions)</i> where all regions have been major compacted,
+ * i.e, all regions have prune upper bound recorded in <i>(region, prune upper bound)</i>.
+ * <br/>
+ * If such a set is found for time <i>t1</i>, the prune upper bound returned by the plugin is the minimum of
+ * <ul>
+ *   <li>Prune upper bounds of regions in set <i>(t1, set of regions)</i></li>
+ *   <li>Inactive transaction bound from <i>(t1, inactive transaction bound)</i></li>
+ * </ul>
+ *
+ * <p/>
+ * Above, when we find <i>(t1, set of regions)</i>, there may a region that was created after time <i>t1</i>,
+ * but has a data write from an invalid transaction that is smaller than the prune upper bounds of all
+ * regions in <i>(t1, set of regions)</i>. This is possible because <i>(region, prune upper bound)</i> persisted by
+ * TransactionProcessor is always the latest prune upper bound for a region.
+ * <br/>
+ * However a region created after time <i>t1</i> cannot have writes from an invalid transaction that is smaller than
+ * inactive transaction bound at the time the region was created.
+ * Since we limit the plugin prune upper bound using <i>(t1, inactive transaction bound)</i>,
+ * there should be no invalid transactions smaller than the plugin prune upper bound with writes in any
+ * transactional region of this HBase instance.
+ *
+ * <p/>
+ * Note: If your tables uses a transactional coprocessor other than TransactionProcessor,
+ * then you may need to write a new plugin to compute prune upper bound for those tables.
+ */
+@SuppressWarnings("WeakerAccess")
+public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
+  public static final Logger LOG = LoggerFactory.getLogger(HBaseTransactionPruningPlugin.class);
+
+  protected Configuration conf;
+  protected Connection connection;
+  protected DataJanitorState dataJanitorState;
+
+  @Override
+  public void initialize(Configuration conf) throws IOException {
+    this.conf = conf;
+    this.connection = ConnectionFactory.createConnection(conf);
+
+    final TableName stateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+                                                            TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
+    LOG.info("Initializing plugin with state table {}", stateTable.getNameWithNamespaceInclAsString());
+    createPruneTable(stateTable);
+    this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
+      @Override
+      public Table get() throws IOException {
+        return connection.getTable(stateTable);
+      }
+    });
+  }
+
+  /**
+   * Determines prune upper bound for the data store as mentioned above.
+   */
+  @Override
+  public long fetchPruneUpperBound(long time, long inactiveTransactionBound) throws IOException {
+    LOG.debug("Fetching prune upper bound for time {} and inactive transaction bound {}",
+              time, inactiveTransactionBound);
+    if (time < 0 || inactiveTransactionBound < 0) {
+      return -1;
+    }
+
+    // Get all the current transactional regions
+    SortedSet<byte[]> transactionalRegions = getTransactionalRegions();
+    if (!transactionalRegions.isEmpty()) {
+      LOG.debug("Saving {} transactional regions for time {}", transactionalRegions.size(), time);
+      dataJanitorState.saveRegionsForTime(time, transactionalRegions);
+      // Save inactive transaction bound for time as the final step.
+      // We can then use its existence to make sure that the data for a given time is complete or not
+      LOG.debug("Saving inactive transaction bound {} for time {}", inactiveTransactionBound, time);
+      dataJanitorState.saveInactiveTransactionBoundForTime(time, inactiveTransactionBound);
+    }
+
+    return computePruneUpperBound(new TimeRegions(time, transactionalRegions));
+  }
+
+  /**
+   * After invalid list has been pruned, this cleans up state information that is no longer required.
+   * This includes -
+   * <ul>
+   *   <li>
+   *     <i>(region, prune upper bound)</i> - prune upper bound for regions that are older
+   *     than maxPrunedInvalid
+   *   </li>
+   *   <li>
+   *     <i>(t, set of regions) - Regions set that were recorded on or before the start time
+   *     of maxPrunedInvalid
+   *   </li>
+   *   <li>
+   *     (t, inactive transaction bound) - Smallest not in-progress transaction without any writes in new regions
+   *     information recorded on or before the start time of maxPrunedInvalid
+   *   </li>
+   * </ul>
+   */
+  @Override
+  public void pruneComplete(long time, long maxPrunedInvalid) throws IOException {
+    LOG.debug("Prune complete for time {} and prune upper bound {}", time, maxPrunedInvalid);
+    if (time < 0 || maxPrunedInvalid < 0) {
+      return;
+    }
+
+    // Get regions for the current time, so as to not delete the prune upper bounds for them.
+    // The prune upper bounds for regions are recorded by TransactionProcessor and the deletion
+    // is done by this class. To avoid update/delete race condition, we only delete prune upper
+    // bounds for the stale regions.
+    TimeRegions regionsToExclude = dataJanitorState.getRegionsOnOrBeforeTime(time);
+    if (regionsToExclude != null) {
+      LOG.debug("Deleting prune upper bounds smaller than {} for stale regions", maxPrunedInvalid);
+      dataJanitorState.deletePruneUpperBounds(maxPrunedInvalid, regionsToExclude.getRegions());
+    } else {
+      LOG.warn("Cannot find saved regions on or before time {}", time);
+    }
+    long pruneTime = TxUtils.getTimestamp(maxPrunedInvalid);
+    LOG.debug("Deleting regions recorded before time {}", pruneTime);
+    dataJanitorState.deleteAllRegionsOnOrBeforeTime(pruneTime);
+    LOG.debug("Deleting inactive transaction bounds recorded on or before time {}", pruneTime);
+    dataJanitorState.deleteInactiveTransactionBoundsOnOrBeforeTime(pruneTime);
+    LOG.debug("Deleting empty regions recorded on or before time {}", pruneTime);
+    dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(pruneTime);
+  }
+
+  @Override
+  public void destroy() {
+    LOG.info("Stopping plugin...");
+    try {
+      connection.close();
+    } catch (IOException e) {
+      LOG.error("Got exception while closing HBase connection", e);
+    }
+  }
+
+  /**
+   * Create the prune state table given the {@link TableName} if the table doesn't exist already.
+   *
+   * @param stateTable prune state table name
+   */
+  protected void createPruneTable(TableName stateTable) throws IOException {
+    try (Admin admin = this.connection.getAdmin()) {
+      if (admin.tableExists(stateTable)) {
+        LOG.debug("Not creating pruneStateTable {} since it already exists.",
+                  stateTable.getNameWithNamespaceInclAsString());
+        return;
+      }
+
+      HTableDescriptor htd = new HTableDescriptor(stateTable);
+      htd.addFamily(new HColumnDescriptor(DataJanitorState.FAMILY).setMaxVersions(1));
+      admin.createTable(htd);
+      LOG.info("Created pruneTable {}", stateTable.getNameWithNamespaceInclAsString());
+    } catch (TableExistsException ex) {
+      // Expected if the prune state table is being created at the same time by another client
+      LOG.debug("Not creating pruneStateTable {} since it already exists.",
+                stateTable.getNameWithNamespaceInclAsString(), ex);
+    }
+  }
+
+  /**
+   * Returns whether the table is a transactional table. By default, it is a table is identified as a transactional
+   * table if it has a the coprocessor {@link TransactionProcessor} attached to it. Should be overriden if the users
+   * attach a different coprocessor.
+   *
+   * @param tableDescriptor {@link HTableDescriptor} of the table
+   * @return true if the table is transactional
+   */
+  protected boolean isTransactionalTable(HTableDescriptor tableDescriptor) {
+    return tableDescriptor.hasCoprocessor(TransactionProcessor.class.getName());
+  }
+
+  protected SortedSet<byte[]> getTransactionalRegions() throws IOException {
+    SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+    try (Admin admin = connection.getAdmin()) {
+      HTableDescriptor[] tableDescriptors = admin.listTables();
+      LOG.debug("Got {} tables to process", tableDescriptors == null ? 0 : tableDescriptors.length);
+      if (tableDescriptors != null) {
+        for (HTableDescriptor tableDescriptor : tableDescriptors) {
+          if (isTransactionalTable(tableDescriptor)) {
+            List<HRegionInfo> tableRegions = admin.getTableRegions(tableDescriptor.getTableName());
+            LOG.debug("Regions for table {}: {}", tableDescriptor.getTableName(), tableRegions);
+            if (tableRegions != null) {
+              for (HRegionInfo region : tableRegions) {
+                regions.add(region.getRegionName());
+              }
+            }
+          } else {
+            LOG.debug("{} is not a transactional table", tableDescriptor.getTableName());
+          }
+        }
+      }
+    }
+    return regions;
+  }
+
+  /**
+   * Try to find the latest set of regions in which all regions have been major compacted, and
+   * compute prune upper bound from them. Starting from newest to oldest, this looks into the
+   * region set that has been saved periodically, and joins it with the prune upper bound data
+   * for a region recorded after a major compaction.
+   *
+   * @param timeRegions the latest set of regions
+   * @return prune upper bound
+   * @throws IOException when not able to talk to HBase
+   */
+  private long computePruneUpperBound(TimeRegions timeRegions) throws IOException {
+    // Get the tables for the current time from the latest regions set
+    final Set<TableName> existingTables = getTableNamesForRegions(timeRegions.getRegions());
+    LOG.debug("Tables for time {} = {}", timeRegions.getTime(), existingTables);
+
+    do {
+      LOG.debug("Computing prune upper bound for {}", timeRegions);
+      SortedSet<byte[]> transactionalRegions = timeRegions.getRegions();
+      long time = timeRegions.getTime();
+
+      long inactiveTransactionBound = dataJanitorState.getInactiveTransactionBoundForTime(time);
+      LOG.debug("Got inactive transaction bound {}", inactiveTransactionBound);
+      // If inactiveTransactionBound is not recorded then that means the data is not complete for these regions
+      if (inactiveTransactionBound == -1) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Ignoring regions for time {} as no inactiveTransactionBound was found for that time, " +
+                      "and hence the data must be incomplete", time);
+        }
+        continue;
+      }
+
+      // Remove non-existing tables from the transactional regions set, so that we don't lookup prune upper bounds
+      // for them. Since the deleted tables do not exist anymore, there is no need to make sure they have been
+      // compacted. This ensures that transient tables do not block pruning progress.
+      transactionalRegions = filterDeletedTableRegions(existingTables, transactionalRegions);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Transactional regions after removing the regions of non-existing tables = {}",
+                  Iterables.transform(transactionalRegions, TimeRegions.BYTE_ARR_TO_STRING_FN));
+      }
+
+      // Get the prune upper bounds for all the transactional regions
+      Map<byte[], Long> pruneUpperBoundRegions =
+        dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions);
+      logPruneUpperBoundRegions(pruneUpperBoundRegions);
+
+      // Use inactiveTransactionBound as the prune upper bound for the empty regions since the regions that are
+      // recorded as empty after inactiveTransactionBoundTime will not have invalid data
+      // for transactions started on or before inactiveTransactionBoundTime
+      pruneUpperBoundRegions = handleEmptyRegions(inactiveTransactionBound, transactionalRegions,
+                                                  pruneUpperBoundRegions);
+
+      // If prune upper bounds are found for all the transactional regions, then compute the prune upper bound
+      // across all regions
+      if (!transactionalRegions.isEmpty() &&
+        pruneUpperBoundRegions.size() == transactionalRegions.size()) {
+        Long minPruneUpperBoundRegions = Collections.min(pruneUpperBoundRegions.values());
+        long pruneUpperBound = Math.min(inactiveTransactionBound, minPruneUpperBoundRegions);
+        LOG.debug("Found prune upper bound {} for time {}", pruneUpperBound, time);
+        return pruneUpperBound;
+      } else {
+        if (LOG.isDebugEnabled()) {
+          Sets.SetView<byte[]> difference =
+            Sets.difference(transactionalRegions, pruneUpperBoundRegions.keySet());
+          LOG.debug("Ignoring regions for time {} because the following regions did not record a pruneUpperBound: {}",
+                    time, Iterables.transform(difference, TimeRegions.BYTE_ARR_TO_STRING_FN));
+        }
+      }
+
+      timeRegions = dataJanitorState.getRegionsOnOrBeforeTime(time - 1);
+    } while (timeRegions != null);
+    return -1;
+  }
+
+  private SortedSet<byte[]> filterDeletedTableRegions(final Set<TableName> existingTables,
+                                                      SortedSet<byte[]> transactionalRegions) {
+    return Sets.filter(transactionalRegions,
+                       new Predicate<byte[]>() {
+                         @Override
+                         public boolean apply(byte[] region) {
+                           return existingTables.contains(HRegionInfo.getTable(region));
+                         }
+                       });
+  }
+
+  private Set<TableName> getTableNamesForRegions(Set<byte[]> regions) {
+    Set<TableName> tableNames = new HashSet<>(regions.size());
+    for (byte[] region : regions) {
+      tableNames.add(HRegionInfo.getTable(region));
+    }
+    return tableNames;
+  }
+
+  private Map<byte[], Long> handleEmptyRegions(long inactiveTransactionBound,
+                                               SortedSet<byte[]> transactionalRegions,
+                                               Map<byte[], Long> pruneUpperBoundRegions) throws IOException {
+    long inactiveTransactionBoundTime = TxUtils.getTimestamp(inactiveTransactionBound);
+    SortedSet<byte[]> emptyRegions =
+      dataJanitorState.getEmptyRegionsAfterTime(inactiveTransactionBoundTime, transactionalRegions);
+    LOG.debug("Got empty transactional regions for inactive transaction bound time {}: {}",
+              inactiveTransactionBoundTime, Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN));
+
+    // The regions that are recorded as empty after inactiveTransactionBoundTime will not have invalid data
+    // for transactions started before or on inactiveTransactionBoundTime. Hence we can consider the prune upper bound
+    // for these empty regions as inactiveTransactionBound
+    Map<byte[], Long> pubWithEmptyRegions = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+    pubWithEmptyRegions.putAll(pruneUpperBoundRegions);
+    for (byte[] emptyRegion : emptyRegions) {
+      if (!pruneUpperBoundRegions.containsKey(emptyRegion)) {
+        pubWithEmptyRegions.put(emptyRegion, inactiveTransactionBound);
+      }
+    }
+    return Collections.unmodifiableMap(pubWithEmptyRegions);
+  }
+
+  private void logPruneUpperBoundRegions(Map<byte[], Long> pruneUpperBoundRegions) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Got region - prune upper bound map: {}",
+                Iterables.transform(pruneUpperBoundRegions.entrySet(),
+                                    new Function<Map.Entry<byte[], Long>, Map.Entry<String, Long>>() {
+                                      @Override
+                                      public Map.Entry<String, Long> apply(Map.Entry<byte[], Long> input) {
+                                        String regionName = TimeRegions.BYTE_ARR_TO_STRING_FN.apply(input.getKey());
+                                        return Maps.immutableEntry(regionName, input.getValue());
+                                      }
+                                    }));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/db6ef6d2/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTool.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTool.java b/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTool.java
new file mode 100644
index 0000000..5d7b871
--- /dev/null
+++ b/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTool.java
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.MinMaxPriorityQueue;
+import com.google.common.collect.Sets;
+import com.google.gson.Gson;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.txprune.RegionPruneInfo;
+import org.apache.tephra.txprune.hbase.InvalidListPruningDebug;
+import org.apache.tephra.txprune.hbase.RegionsAtTime;
+import org.apache.tephra.util.TimeMathParser;
+import org.apache.tephra.util.TxUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+
+/**
+ * Invalid List Pruning Debug Tool.
+ */
+public class InvalidListPruningDebugTool implements InvalidListPruningDebug {
+  private static final Logger LOG = LoggerFactory.getLogger(InvalidListPruningDebugTool.class);
+  private static final Gson GSON = new Gson();
+  private static final String NOW = "now";
+  @VisibleForTesting
+  static final String DATE_FORMAT = "d-MMM-yyyy HH:mm:ss z";
+
+  private DataJanitorState dataJanitorState;
+  private Connection connection;
+  private TableName tableName;
+
+  /**
+   * Initialize the Invalid List Debug Tool.
+   * @param conf {@link Configuration}
+   * @throws IOException when not able to create an HBase connection
+   */
+  @Override
+  @SuppressWarnings("WeakerAccess")
+  public void initialize(final Configuration conf) throws IOException {
+    LOG.debug("InvalidListPruningDebugMain : initialize method called");
+    connection = ConnectionFactory.createConnection(conf);
+    tableName = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+                                           TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
+    dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
+      @Override
+      public Table get() throws IOException {
+        return connection.getTable(tableName);
+      }
+    });
+  }
+
+  @Override
+  @SuppressWarnings("WeakerAccess")
+  public void destroy() throws IOException {
+    if (connection != null) {
+      connection.close();
+    }
+  }
+
+  /**
+   * Returns a set of regions that are live but are not empty nor have a prune upper bound recorded. These regions
+   * will stop the progress of pruning.
+   * <p/>
+   * Note that this can return false positives in the following case -
+   * At time 't' empty regions were recorded, and time 't+1' prune iteration was invoked.
+   * Since  a new set of regions was recorded at time 't+1', all regions recorded as empty before time 't + 1' will
+   * now be reported as blocking the pruning, even though they are empty. This is because we cannot tell if those
+   * regions got any new data between time 't' and 't + 1'.
+   *
+   * @param numRegions number of regions
+   * @param time time in milliseconds or relative time, regions recorded before the given time are returned
+   * @return {@link Set} of regions that needs to be compacted and flushed
+   */
+  @Override
+  @SuppressWarnings("WeakerAccess")
+  public Set<String> getRegionsToBeCompacted(Integer numRegions, String time) throws IOException {
+    // Fetch the live regions at the given time
+    RegionsAtTime timeRegion = getRegionsOnOrBeforeTime(time);
+    if (timeRegion.getRegions().isEmpty()) {
+      return Collections.emptySet();
+    }
+
+    Long timestamp = timeRegion.getTime();
+    SortedSet<String> regions = timeRegion.getRegions();
+
+    // Get the live regions
+    SortedSet<String> liveRegions = getRegionsOnOrBeforeTime(NOW).getRegions();
+    // Retain only the live regions
+    regions = Sets.newTreeSet(Sets.intersection(liveRegions, regions));
+
+    SortedSet<byte[]> emptyRegions = dataJanitorState.getEmptyRegionsAfterTime(timestamp, null);
+    SortedSet<String> emptyRegionNames = new TreeSet<>();
+    Iterable<String> regionStrings = Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN);
+    for (String regionString : regionStrings) {
+      emptyRegionNames.add(regionString);
+    }
+
+    Set<String> nonEmptyRegions = Sets.newHashSet(Sets.difference(regions, emptyRegionNames));
+
+    // Get all pruned regions for the current time and remove them from the nonEmptyRegions,
+    // resulting in a set of regions that are not empty and have not been registered prune upper bound
+    List<RegionPruneInfo> prunedRegions = dataJanitorState.getPruneInfoForRegions(null);
+    for (RegionPruneInfo prunedRegion : prunedRegions) {
+      if (nonEmptyRegions.contains(prunedRegion.getRegionNameAsString())) {
+        nonEmptyRegions.remove(prunedRegion.getRegionNameAsString());
+      }
+    }
+
+    if ((numRegions < 0) || (numRegions >= nonEmptyRegions.size())) {
+      return nonEmptyRegions;
+    }
+
+    Set<String> subsetRegions = new HashSet<>(numRegions);
+    for (String regionName : nonEmptyRegions) {
+      if (subsetRegions.size() == numRegions) {
+        break;
+      }
+      subsetRegions.add(regionName);
+    }
+    return subsetRegions;
+  }
+
+  /**
+   * Return a list of RegionPruneInfo. These regions are the ones that have the lowest prune upper bounds.
+   * If -1 is passed in, all the regions and their prune upper bound will be returned. Note that only the regions
+   * that are known to be live will be returned.
+   *
+   * @param numRegions number of regions
+   * @param time time in milliseconds or relative time, regions recorded before the given time are returned
+   * @return Map of region name and its prune upper bound
+   */
+  @Override
+  @SuppressWarnings("WeakerAccess")
+  public SortedSet<RegionPruneInfoPretty> getIdleRegions(Integer numRegions, String time) throws IOException {
+    List<RegionPruneInfo> regionPruneInfos = dataJanitorState.getPruneInfoForRegions(null);
+    if (regionPruneInfos.isEmpty()) {
+      return new TreeSet<>();
+    }
+
+    // Create a set with region names
+    Set<String> pruneRegionNameSet = new HashSet<>();
+    for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
+      pruneRegionNameSet.add(regionPruneInfo.getRegionNameAsString());
+    }
+
+    // Fetch the latest live regions
+    RegionsAtTime latestRegions = getRegionsOnOrBeforeTime(NOW);
+
+    // Fetch the regions at the given time
+    RegionsAtTime timeRegions = getRegionsOnOrBeforeTime(time);
+    Set<String> liveRegions = Sets.intersection(latestRegions.getRegions(), timeRegions.getRegions());
+    Set<String> liveRegionsWithPruneInfo = Sets.intersection(liveRegions, pruneRegionNameSet);
+    List<RegionPruneInfo> liveRegionWithPruneInfoList = new ArrayList<>();
+    for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
+      if (liveRegionsWithPruneInfo.contains(regionPruneInfo.getRegionNameAsString())) {
+        liveRegionWithPruneInfoList.add(regionPruneInfo);
+      }
+
+      // Use the subset of live regions and prune regions
+      regionPruneInfos = liveRegionWithPruneInfoList;
+    }
+
+    if (numRegions < 0) {
+      numRegions = regionPruneInfos.size();
+    }
+
+    Comparator<RegionPruneInfo> comparator = new Comparator<RegionPruneInfo>() {
+      @Override
+      public int compare(RegionPruneInfo o1, RegionPruneInfo o2) {
+        int result = Long.compare(o1.getPruneUpperBound(), o2.getPruneUpperBound());
+        if (result == 0) {
+          return o1.getRegionNameAsString().compareTo(o2.getRegionNameAsString());
+        }
+        return result;
+      }
+    };
+    MinMaxPriorityQueue<RegionPruneInfoPretty> lowestPrunes =
+      MinMaxPriorityQueue.orderedBy(comparator).maximumSize(numRegions).create();
+
+    for (RegionPruneInfo pruneInfo : regionPruneInfos) {
+      lowestPrunes.add(new RegionPruneInfoPretty(pruneInfo));
+    }
+
+    SortedSet<RegionPruneInfoPretty> regions = new TreeSet<>(comparator);
+    regions.addAll(lowestPrunes);
+    return regions;
+  }
+
+  /**
+   * Return the prune upper bound value of a given region. If no prune upper bound has been written for this region yet,
+   * it will return a null.
+   *
+   * @param regionId region id
+   * @return {@link RegionPruneInfo} of the region
+   * @throws IOException if there are any errors while trying to fetch the {@link RegionPruneInfo}
+   */
+  @Override
+  @SuppressWarnings("WeakerAccess")
+  @Nullable
+  public RegionPruneInfoPretty getRegionPruneInfo(String regionId) throws IOException {
+    RegionPruneInfo pruneInfo = dataJanitorState.getPruneInfoForRegion(Bytes.toBytesBinary(regionId));
+    return pruneInfo == null ? null : new RegionPruneInfoPretty(pruneInfo);
+  }
+
+  /**
+   *
+   * @param timeString Given a time, provide the {@link TimeRegions} at or before that time.
+   *                   Time can be in milliseconds or relative time.
+   * @return transactional regions that are present at or before the given time
+   * @throws IOException if there are any errors while trying to fetch the {@link TimeRegions}
+   */
+  @Override
+  @SuppressWarnings("WeakerAccess")
+  public RegionsAtTime getRegionsOnOrBeforeTime(String timeString) throws IOException {
+    long time = TimeMathParser.parseTime(timeString, TimeUnit.MILLISECONDS);
+    SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+    TimeRegions timeRegions = dataJanitorState.getRegionsOnOrBeforeTime(time);
+    if (timeRegions == null) {
+      return new RegionsAtTime(time, new TreeSet<String>(), dateFormat);
+    }
+    SortedSet<String> regionNames = new TreeSet<>();
+    Iterable<String> regionStrings = Iterables.transform(timeRegions.getRegions(), TimeRegions.BYTE_ARR_TO_STRING_FN);
+    for (String regionString : regionStrings) {
+      regionNames.add(regionString);
+    }
+    return new RegionsAtTime(timeRegions.getTime(), regionNames, dateFormat);
+  }
+
+  private void printUsage(PrintWriter pw) {
+    pw.println();
+    pw.println("Usage : org.apache.tephra.hbase.txprune.InvalidListPruning <command> <parameters>");
+    pw.println();
+    pw.println("Available commands");
+    pw.println("------------------");
+    pw.println("to-compact-regions limit [time]");
+    pw.println("Desc: Prints out the regions that are active, but not empty, " +
+                 "and have not registered a prune upper bound.");
+    pw.println();
+    pw.println("idle-regions limit [time]");
+    pw.println("Desc: Prints out the regions that have the lowest prune upper bounds.");
+    pw.println();
+    pw.println("prune-info region-name-as-string");
+    pw.println("Desc: Prints the prune upper bound and the time it was recorded for the given region.");
+    pw.println();
+    pw.println("time-region [time]");
+    pw.println("Desc: Prints out the transactional regions present in HBase recorded at or before the given time.");
+    pw.println();
+    pw.println("Parameters");
+    pw.println("----------");
+    pw.println(" * limit - used to limit the number of regions returned, -1 to apply no limit");
+    pw.println(" * time  - if time is not provided, the current time is used. ");
+    pw.println("             When provided, the data recorded on or before the given time is returned.");
+    pw.println("             Time can be provided in milliseconds, or can be provided as a relative time.");
+    pw.println("             Examples for relative time -");
+    pw.println("             now = current time,");
+    pw.println("             now-1d = current time - 1 day,");
+    pw.println("             now-1d+4h = 20 hours before now,");
+    pw.println("             now+5s = current time + 5 seconds");
+    pw.println();
+  }
+
+  @VisibleForTesting
+  boolean execute(String[] args, PrintWriter out) throws IOException {
+      if (args.length < 1) {
+        printUsage(out);
+        return false;
+      }
+
+    String command = args[0];
+    switch (command) {
+        case "time-region":
+          if (args.length <= 2) {
+            String time = args.length == 2 ? args[1] : NOW;
+            RegionsAtTime timeRegion = getRegionsOnOrBeforeTime(time);
+            out.println(GSON.toJson(timeRegion));
+            return true;
+          }
+          break;
+        case "idle-regions":
+          if (args.length <= 3) {
+            Integer numRegions = Integer.parseInt(args[1]);
+            String time = args.length == 3 ? args[2] : NOW;
+            SortedSet<RegionPruneInfoPretty> regionPruneInfos = getIdleRegions(numRegions, time);
+            out.println(GSON.toJson(regionPruneInfos));
+            return true;
+          }
+          break;
+        case "prune-info":
+          if (args.length == 2) {
+            String regionName = args[1];
+            RegionPruneInfo regionPruneInfo = getRegionPruneInfo(regionName);
+            if (regionPruneInfo != null) {
+              out.println(GSON.toJson(regionPruneInfo));
+            } else {
+              out.println(String.format("No prune info found for the region %s.", regionName));
+            }
+            return true;
+          }
+          break;
+        case "to-compact-regions":
+          if (args.length <= 3) {
+            Integer numRegions = Integer.parseInt(args[1]);
+            String time = args.length == 3 ? args[2] : NOW;
+            Set<String> toBeCompactedRegions = getRegionsToBeCompacted(numRegions, time);
+            out.println(GSON.toJson(toBeCompactedRegions));
+            return true;
+          }
+          break;
+      }
+
+      printUsage(out);
+      return false;
+  }
+
+  public static void main(String[] args) {
+    Configuration hConf = HBaseConfiguration.create();
+    InvalidListPruningDebugTool pruningDebug = new InvalidListPruningDebugTool();
+    try (PrintWriter out = new PrintWriter(System.out)) {
+      pruningDebug.initialize(hConf);
+      boolean success = pruningDebug.execute(args, out);
+      pruningDebug.destroy();
+      if (!success) {
+        System.exit(1);
+      }
+    } catch (IOException ex) {
+      LOG.error("Received an exception while trying to execute the debug tool. ", ex);
+    }
+  }
+
+  /**
+   * Wrapper class around {@link RegionPruneInfo} to print human readable dates for timestamps.
+   */
+  @SuppressWarnings({"WeakerAccess", "unused"})
+  public static class RegionPruneInfoPretty extends RegionPruneInfo {
+    private final transient SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+    private final String pruneUpperBoundAsString;
+    private final String pruneRecordTimeAsString;
+
+    public RegionPruneInfoPretty(RegionPruneInfo regionPruneInfo) {
+      this(regionPruneInfo.getRegionName(), regionPruneInfo.getRegionNameAsString(),
+           regionPruneInfo.getPruneUpperBound(), regionPruneInfo.getPruneRecordTime());
+    }
+
+    public RegionPruneInfoPretty(byte[] regionName, String regionNameAsString,
+                                 long pruneUpperBound, long pruneRecordTime) {
+      super(regionName, regionNameAsString, pruneUpperBound, pruneRecordTime);
+      pruneUpperBoundAsString = dateFormat.format(TxUtils.getTimestamp(pruneUpperBound));
+      pruneRecordTimeAsString = dateFormat.format(pruneRecordTime);
+    }
+
+    public String getPruneUpperBoundAsString() {
+      return pruneUpperBoundAsString;
+    }
+
+    public String getPruneRecordTimeAsString() {
+      return pruneRecordTimeAsString;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      if (!super.equals(o)) {
+        return false;
+      }
+      RegionPruneInfoPretty that = (RegionPruneInfoPretty) o;
+      return Objects.equals(pruneUpperBoundAsString, that.pruneUpperBoundAsString) &&
+        Objects.equals(pruneRecordTimeAsString, that.pruneRecordTimeAsString);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(super.hashCode(), pruneUpperBoundAsString, pruneRecordTimeAsString);
+    }
+
+    @Override
+    public String toString() {
+      return "RegionPruneInfoPretty{" +
+        ", pruneUpperBoundAsString='" + pruneUpperBoundAsString + '\'' +
+        ", pruneRecordTimeAsString='" + pruneRecordTimeAsString + '\'' +
+        "} " + super.toString();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/db6ef6d2/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
new file mode 100644
index 0000000..677710b
--- /dev/null
+++ b/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import com.google.common.util.concurrent.AbstractIdleService;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Thread that will write the the prune upper bound. An instance of this class should be obtained only
+ * through {@link PruneUpperBoundWriterSupplier} which will also handle the lifecycle of this instance.
+ */
+public class PruneUpperBoundWriter extends AbstractIdleService {
+  private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriter.class);
+
+  private final TableName tableName;
+  private final DataJanitorState dataJanitorState;
+  private final long pruneFlushInterval;
+  // Map of region name -> prune upper bound
+  private final ConcurrentSkipListMap<byte[], Long> pruneEntries;
+  // Map of region name -> time the region was found to be empty
+  private final ConcurrentSkipListMap<byte[], Long> emptyRegions;
+
+  private volatile Thread flushThread;
+  private volatile boolean stopped;
+
+  private long lastChecked;
+
+  @SuppressWarnings("WeakerAccess")
+  public PruneUpperBoundWriter(TableName tableName, DataJanitorState dataJanitorState, long pruneFlushInterval) {
+    this.tableName = tableName;
+    this.dataJanitorState = dataJanitorState;
+    this.pruneFlushInterval = pruneFlushInterval;
+    this.pruneEntries = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
+    this.emptyRegions = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
+  }
+
+  @SuppressWarnings("WeakerAccess")
+  public void persistPruneEntry(byte[] regionName, long pruneUpperBound) {
+    warnIfNotRunning(regionName);
+    // The number of entries in this map is bound by the number of regions in this region server and thus it will not
+    // grow indefinitely
+    pruneEntries.put(regionName, pruneUpperBound);
+  }
+
+  @SuppressWarnings("WeakerAccess")
+  public void persistRegionEmpty(byte[] regionName, long time) {
+    warnIfNotRunning(regionName);
+    // The number of entries in this map is bound by the number of regions in this region server and thus it will not
+    // grow indefinitely
+    emptyRegions.put(regionName, time);
+  }
+
+  @SuppressWarnings("WeakerAccess")
+  public boolean isAlive() {
+    return flushThread != null && flushThread.isAlive();
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    LOG.info("Starting PruneUpperBoundWriter Thread.");
+    startFlushThread();
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    LOG.info("Stopping PruneUpperBoundWriter Thread.");
+    stopped = true;
+    if (flushThread != null) {
+      flushThread.interrupt();
+      flushThread.join(TimeUnit.SECONDS.toMillis(1));
+      if (flushThread.isAlive()) {
+        flushThread.interrupt();
+        flushThread.join(TimeUnit.SECONDS.toMillis(1));
+      }
+    }
+  }
+
+  private void startFlushThread() {
+    flushThread = new Thread("tephra-prune-upper-bound-writer") {
+      @Override
+      public void run() {
+        while ((!isInterrupted()) && (!stopped)) {
+          long now = System.currentTimeMillis();
+          if (now > (lastChecked + pruneFlushInterval)) {
+            // should flush data
+            try {
+              User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
+                @Override
+                public Void run() throws Exception {
+                  // Record prune upper bound
+                  while (!pruneEntries.isEmpty()) {
+                    Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
+                    dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
+                    // We can now remove the entry only if the key and value match with what we wrote since it is
+                    // possible that a new pruneUpperBound for the same key has been added
+                    pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
+                  }
+                  // Record empty regions
+                  while (!emptyRegions.isEmpty()) {
+                    Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry();
+                    dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey());
+                    // We can now remove the entry only if the key and value match with what we wrote since it is
+                    // possible that a new value for the same key has been added
+                    emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue());
+                  }
+                  return null;
+                }
+              });
+            } catch (IOException ex) {
+              LOG.warn("Cannot record prune upper bound for a region to table " +
+                         tableName.getNameWithNamespaceInclAsString(), ex);
+            }
+            lastChecked = now;
+          }
+
+          try {
+            TimeUnit.SECONDS.sleep(1);
+          } catch (InterruptedException ex) {
+            interrupt();
+            break;
+          }
+        }
+
+        LOG.info("PruneUpperBound Writer thread terminated.");
+      }
+    };
+
+    flushThread.setDaemon(true);
+    flushThread.start();
+  }
+
+  private void warnIfNotRunning(byte[] regionName) {
+    if (!isRunning() || !isAlive()) {
+      LOG.warn(String.format("Trying to persist prune upper bound for region %s when writer is not %s!",
+                             Bytes.toStringBinary(regionName), isRunning() ? "alive" : "running"));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/db6ef6d2/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java b/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
new file mode 100644
index 0000000..cb93fab
--- /dev/null
+++ b/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+
+import com.google.common.base.Supplier;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.tephra.coprocessor.CacheSupplier;
+import org.apache.tephra.coprocessor.ReferenceCountedSupplier;
+
+/**
+ * Supplies instances of {@link PruneUpperBoundWriter} implementations.
+ */
+public class PruneUpperBoundWriterSupplier implements CacheSupplier<PruneUpperBoundWriter> {
+
+  private static final ReferenceCountedSupplier<PruneUpperBoundWriter> referenceCountedSupplier =
+    new ReferenceCountedSupplier<>(PruneUpperBoundWriter.class.getSimpleName());
+
+  private final Supplier<PruneUpperBoundWriter> supplier;
+
+  public PruneUpperBoundWriterSupplier(final TableName tableName, final DataJanitorState dataJanitorState,
+                                       final long pruneFlushInterval) {
+    this.supplier = new Supplier<PruneUpperBoundWriter>() {
+      @Override
+      public PruneUpperBoundWriter get() {
+        return new PruneUpperBoundWriter(tableName, dataJanitorState, pruneFlushInterval);
+      }
+    };
+  }
+
+  @Override
+  public PruneUpperBoundWriter get() {
+    return referenceCountedSupplier.getOrCreate(supplier);
+  }
+
+  public void release() {
+    referenceCountedSupplier.release();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/db6ef6d2/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/txprune/TimeRegions.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/txprune/TimeRegions.java b/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/txprune/TimeRegions.java
new file mode 100644
index 0000000..4ac8887
--- /dev/null
+++ b/tephra-hbase-compat-2.0/src/main/java/org/apache/tephra/hbase/txprune/TimeRegions.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.util.Objects;
+import java.util.SortedSet;
+
+/**
+ * Contains information on the set of transactional regions recorded at a given time
+ */
+@SuppressWarnings("WeakerAccess")
+public class TimeRegions {
+  static final Function<byte[], String> BYTE_ARR_TO_STRING_FN =
+    new Function<byte[], String>() {
+      @Override
+      public String apply(byte[] input) {
+        return Bytes.toStringBinary(input);
+      }
+    };
+
+  private final long time;
+  private final SortedSet<byte[]> regions;
+
+  public TimeRegions(long time, SortedSet<byte[]> regions) {
+    this.time = time;
+    this.regions = regions;
+  }
+
+  public long getTime() {
+    return time;
+  }
+
+  public SortedSet<byte[]> getRegions() {
+    return regions;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    TimeRegions that = (TimeRegions) o;
+    return time == that.time &&
+      Objects.equals(regions, that.regions);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(time, regions);
+  }
+
+  @Override
+  public String toString() {
+    Iterable<String> regionStrings = Iterables.transform(regions, BYTE_ARR_TO_STRING_FN);
+    return "TimeRegions{" +
+      "time=" + time +
+      ", regions=[" + Joiner.on(" ").join(regionStrings) + "]" +
+      '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/db6ef6d2/tephra-hbase-compat-2.0/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-2.0/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java b/tephra-hbase-compat-2.0/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java
new file mode 100644
index 0000000..179b22e
--- /dev/null
+++ b/tephra-hbase-compat-2.0/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Base class for tests that need a HBase cluster
+ */
+@SuppressWarnings("WeakerAccess")
+public abstract class AbstractHBaseTableTest {
+  protected static HBaseTestingUtility testUtil;
+  protected static HBaseAdmin hBaseAdmin;
+  protected static Configuration conf;
+
+  @BeforeClass
+  public static void startMiniCluster() throws Exception {
+    testUtil = conf == null ? new HBaseTestingUtility() : new HBaseTestingUtility(conf);
+    conf = testUtil.getConfiguration();
+
+    // Tune down the connection thread pool size
+    conf.setInt("hbase.hconnection.threads.core", 5);
+    conf.setInt("hbase.hconnection.threads.max", 10);
+    // Tunn down handler threads in regionserver
+    conf.setInt("hbase.regionserver.handler.count", 10);
+
+    // Set to random port
+    conf.setInt("hbase.master.port", 0);
+    conf.setInt("hbase.master.info.port", 0);
+    conf.setInt("hbase.regionserver.port", 0);
+    conf.setInt("hbase.regionserver.info.port", 0);
+    testUtil.startMiniCluster();
+    hBaseAdmin = testUtil.getHBaseAdmin();
+  }
+
+  @AfterClass
+  public static void shutdownMiniCluster() throws Exception {
+    try {
+      if (hBaseAdmin != null) {
+        hBaseAdmin.close();
+      }
+    } finally {
+      testUtil.shutdownMiniCluster();
+    }
+  }
+
+  protected static Table createTable(byte[] tableName, byte[][] columnFamilies) throws Exception {
+    return createTable(tableName, columnFamilies, false,
+                       Collections.singletonList(TransactionProcessor.class.getName()));
+  }
+
+  protected static Table createTable(byte[] tableName, byte[][] columnFamilies, boolean existingData,
+                                      List<String> coprocessors) throws Exception {
+    HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
+    for (byte[] family : columnFamilies) {
+      HColumnDescriptor columnDesc = new HColumnDescriptor(family);
+      columnDesc.setMaxVersions(Integer.MAX_VALUE);
+      columnDesc.setValue(TxConstants.PROPERTY_TTL, String.valueOf(100000)); // in millis
+      desc.addFamily(columnDesc);
+    }
+    if (existingData) {
+      desc.setValue(TxConstants.READ_NON_TX_DATA, "true");
+    }
+    // Divide individually to prevent any overflow
+    int priority = Coprocessor.PRIORITY_USER;
+    // order in list is the same order that coprocessors will be invoked
+    for (String coprocessor : coprocessors) {
+      desc.addCoprocessor(coprocessor, null, ++priority, null);
+    }
+    hBaseAdmin.createTable(desc);
+    testUtil.waitTableAvailable(tableName, 5000);
+    return testUtil.getConnection().getTable(TableName.valueOf(tableName));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/db6ef6d2/tephra-hbase-compat-2.0/src/test/java/org/apache/tephra/hbase/HBase20ConfigurationProviderTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-2.0/src/test/java/org/apache/tephra/hbase/HBase20ConfigurationProviderTest.java b/tephra-hbase-compat-2.0/src/test/java/org/apache/tephra/hbase/HBase20ConfigurationProviderTest.java
new file mode 100644
index 0000000..7e7b3c0
--- /dev/null
+++ b/tephra-hbase-compat-2.0/src/test/java/org/apache/tephra/hbase/HBase20ConfigurationProviderTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.tephra.util.AbstractConfigurationProviderTest;
+import org.apache.tephra.util.HBaseVersion;
+
+import java.util.Collection;
+
+/**
+ * Test for HBase 2.0 version specific behavior.
+ */
+public class HBase20ConfigurationProviderTest extends AbstractConfigurationProviderTest {
+  @Override
+  protected Collection<HBaseVersion.Version> getExpectedVersions() {
+    return ImmutableList.of(HBaseVersion.Version.HBASE_20);
+  }
+}