You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by an...@apache.org on 2017/09/06 08:01:15 UTC

[4/5] incubator-tephra git commit: Support for HBase 1.3.x

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
new file mode 100644
index 0000000..40e2c37
--- /dev/null
+++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.coprocessor;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.util.TxUtils;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+/**
+ * Applies filtering of data based on transactional visibility (HBase 1.3 specific version).
+ * Note: this is intended for server-side use only, as additional properties need to be set on
+ * any {@code Scan} or {@code Get} operation performed.
+ */
+public class TransactionVisibilityFilter extends FilterBase {
+  private final Transaction tx;
+  // oldest visible timestamp by column family, used to apply TTL when reading
+  private final Map<ImmutableBytesWritable, Long> oldestTsByFamily;
+  // if false, empty values will be interpreted as deletes
+  private final boolean allowEmptyValues;
+  // whether or not we can remove delete markers
+  // these can only be safely removed when we are traversing all storefiles
+  private final boolean clearDeletes;
+  // optional sub-filter to apply to visible cells
+  private final Filter cellFilter;
+  // since we traverse KVs in order, cache the current oldest TS to avoid map lookups per KV
+  private final ImmutableBytesWritable currentFamily = new ImmutableBytesWritable(HConstants.EMPTY_BYTE_ARRAY);
+  
+  private long currentOldestTs;
+
+  private DeleteTracker deleteTracker = new DeleteTracker();
+
+  /**
+   * Creates a new {@link org.apache.hadoop.hbase.filter.Filter} for returning data only from visible transactions.
+   *
+   * @param tx the current transaction to apply.  Only data visible to this transaction will be returned.
+   * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
+   * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
+   *                         these will be interpreted as "delete" markers and the column will be filtered out
+   * @param scanType the type of scan operation being performed
+   */
+  public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
+                              ScanType scanType) {
+    this(tx, ttlByFamily, allowEmptyValues, scanType, null);
+  }
+
+  /**
+   * Creates a new {@link org.apache.hadoop.hbase.filter.Filter} for returning data only from visible transactions.
+   *
+   * @param tx the current transaction to apply.  Only data visible to this transaction will be returned.
+   * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
+   * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
+   *                         these will be interpreted as "delete" markers and the column will be filtered out
+   * @param scanType the type of scan operation being performed
+   * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by
+   *                   calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}.  If null, then
+   *                   {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead.
+   */
+   public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
+                               ScanType scanType, @Nullable Filter cellFilter) {
+    this.tx = tx;
+    this.oldestTsByFamily = Maps.newTreeMap();
+    for (Map.Entry<byte[], Long> ttlEntry : ttlByFamily.entrySet()) {
+      long familyTTL = ttlEntry.getValue();
+      oldestTsByFamily.put(new ImmutableBytesWritable(ttlEntry.getKey()),
+                           familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS);
+    }
+    this.allowEmptyValues = allowEmptyValues;
+    this.clearDeletes =
+      scanType == ScanType.COMPACT_DROP_DELETES ||
+        (scanType == ScanType.USER_SCAN && tx.getVisibilityLevel() != Transaction.VisibilityLevel.SNAPSHOT_ALL);
+    this.cellFilter = cellFilter;
+  }
+
+  @Override
+  public ReturnCode filterKeyValue(Cell cell) throws IOException {
+    if (!CellUtil.matchingFamily(cell, currentFamily.get(), currentFamily.getOffset(), currentFamily.getLength())) {
+      // column family changed
+      currentFamily.set(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
+      Long familyOldestTs = oldestTsByFamily.get(currentFamily);
+      currentOldestTs = familyOldestTs != null ? familyOldestTs : 0;
+      deleteTracker.reset();
+    }
+    // need to apply TTL for the column family here
+    long kvTimestamp = cell.getTimestamp();
+    if (TxUtils.getTimestampForTTL(kvTimestamp) < currentOldestTs) {
+      // passed TTL for this column, seek to next
+      return ReturnCode.NEXT_COL;
+    } else if (tx.isVisible(kvTimestamp)) {
+      // Return all writes done by current transaction (including deletes) for VisibilityLevel.SNAPSHOT_ALL
+      if (tx.getVisibilityLevel() == Transaction.VisibilityLevel.SNAPSHOT_ALL && tx.isCurrentWrite(kvTimestamp)) {
+        // cell is visible
+        // visibility SNAPSHOT_ALL needs all matches
+        return runSubFilter(ReturnCode.INCLUDE, cell);
+      }
+      if (DeleteTracker.isFamilyDelete(cell)) {
+        deleteTracker.addFamilyDelete(cell);
+        if (clearDeletes) {
+          return ReturnCode.NEXT_COL;
+        } else {
+          // cell is visible
+          // as soon as we find a KV to include we can move to the next column
+          return runSubFilter(ReturnCode.INCLUDE_AND_NEXT_COL, cell);
+        }
+      }
+      // check if masked by family delete
+      if (deleteTracker.isDeleted(cell)) {
+        return ReturnCode.NEXT_COL;
+      }
+      // check for column delete
+      if (isColumnDelete(cell)) {
+        if (clearDeletes) {
+          // skip "deleted" cell
+          return ReturnCode.NEXT_COL;
+        } else {
+          // keep the marker but skip any remaining versions
+          return runSubFilter(ReturnCode.INCLUDE_AND_NEXT_COL, cell);
+        }
+      }
+      // cell is visible
+      // as soon as we find a KV to include we can move to the next column
+      return runSubFilter(ReturnCode.INCLUDE_AND_NEXT_COL, cell);
+    } else {
+      return ReturnCode.SKIP;
+    }
+  }
+
+  private ReturnCode runSubFilter(ReturnCode txFilterCode, Cell cell) throws IOException {
+    if (cellFilter != null) {
+      ReturnCode subFilterCode = cellFilter.filterKeyValue(cell);
+      return determineReturnCode(txFilterCode, subFilterCode);
+    }
+    return txFilterCode;
+  }
+
+  /**
+   * Determines the return code of TransactionVisibilityFilter based on sub-filter's return code.
+   * Sub-filter can only exclude cells included by TransactionVisibilityFilter, i.e., sub-filter's
+   * INCLUDE will be ignored. This behavior makes sure that sub-filter only sees cell versions valid for the
+   * given transaction. If sub-filter needs to see older versions of cell, then this method can be overridden.
+   *
+   * @param txFilterCode return code from TransactionVisibilityFilter
+   * @param subFilterCode return code from sub-filter
+   * @return final return code
+   */
+  protected ReturnCode determineReturnCode(ReturnCode txFilterCode, ReturnCode subFilterCode) {
+    // Return the more restrictive of the two filter responses
+    switch (subFilterCode) {
+      case INCLUDE:
+        return txFilterCode;
+      case INCLUDE_AND_NEXT_COL:
+        return ReturnCode.INCLUDE_AND_NEXT_COL;
+      case SKIP:
+        return txFilterCode == ReturnCode.INCLUDE ? ReturnCode.SKIP : ReturnCode.NEXT_COL;
+      default:
+        return subFilterCode;
+    }
+  }
+
+  @Override
+  public boolean filterRow() throws IOException {
+    if (cellFilter != null) {
+      return cellFilter.filterRow();
+    }
+    return super.filterRow();
+  }
+  
+  @Override
+  public Cell transformCell(Cell cell) throws IOException {
+    // Convert Tephra deletes back into HBase deletes
+    if (tx.getVisibilityLevel() == Transaction.VisibilityLevel.SNAPSHOT_ALL) {
+      if (DeleteTracker.isFamilyDelete(cell)) {
+        return new KeyValue(CellUtil.cloneRow(cell), CellUtil.cloneFamily(cell), null, cell.getTimestamp(),
+                            KeyValue.Type.DeleteFamily);
+      } else if (isColumnDelete(cell)) {
+        // Note: in some cases KeyValue.Type.Delete is used in Delete object,
+        // and in some other cases KeyValue.Type.DeleteColumn is used.
+        // Since Tephra cannot distinguish between the two, we return KeyValue.Type.DeleteColumn.
+        // KeyValue.Type.DeleteColumn makes both CellUtil.isDelete and CellUtil.isDeleteColumns return true, and will
+        // work in both cases.
+        return new KeyValue(CellUtil.cloneRow(cell), CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell),
+                            cell.getTimestamp(), KeyValue.Type.DeleteColumn);
+      }
+    }
+    return cell;
+  }
+
+  @Override
+  public void reset() throws IOException {
+    deleteTracker.reset();
+    if (cellFilter != null) {
+      cellFilter.reset();
+    }
+  }
+
+  @Override
+  public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException {
+    if (cellFilter != null) {
+      return cellFilter.filterRowKey(buffer, offset, length);
+    }
+    return super.filterRowKey(buffer, offset, length);
+  }
+
+  @Override
+  public boolean filterAllRemaining() throws IOException {
+    if (cellFilter != null) {
+      return cellFilter.filterAllRemaining();
+    }
+    return super.filterAllRemaining();
+  }
+
+  @Override
+  public void filterRowCells(List<Cell> kvs) throws IOException {
+    if (cellFilter != null) {
+      cellFilter.filterRowCells(kvs);
+    } else {
+      super.filterRowCells(kvs);
+    }
+  }
+
+  @Override
+  public boolean hasFilterRow() {
+    if (cellFilter != null) {
+      return cellFilter.hasFilterRow();
+    }
+    return super.hasFilterRow();
+  }
+
+  @SuppressWarnings("deprecation")
+  @Override
+  public KeyValue getNextKeyHint(KeyValue currentKV) throws IOException {
+    if (cellFilter != null) {
+      return cellFilter.getNextKeyHint(currentKV);
+    }
+    return super.getNextKeyHint(currentKV);
+  }
+
+  @Override
+  public Cell getNextCellHint(Cell currentKV) throws IOException {
+    if (cellFilter != null) {
+      return cellFilter.getNextCellHint(currentKV);
+    }
+    return super.getNextCellHint(currentKV);
+  }
+
+  @Override
+  public boolean isFamilyEssential(byte[] name) throws IOException {
+    if (cellFilter != null) {
+      return cellFilter.isFamilyEssential(name);
+    }
+    return super.isFamilyEssential(name);
+  }
+
+  private boolean isColumnDelete(Cell cell) {
+    return !TxUtils.isPreExistingVersion(cell.getTimestamp()) && cell.getValueLength() == 0 && !allowEmptyValues;
+  }
+
+  private static final class DeleteTracker {
+    private long familyDeleteTs;
+    private byte[] rowKey;
+
+    public static boolean isFamilyDelete(Cell cell) {
+      return !TxUtils.isPreExistingVersion(cell.getTimestamp()) &&
+        CellUtil.matchingQualifier(cell, TxConstants.FAMILY_DELETE_QUALIFIER) &&
+        CellUtil.matchingValue(cell, HConstants.EMPTY_BYTE_ARRAY);
+    }
+
+    public void addFamilyDelete(Cell delete) {
+      this.familyDeleteTs = delete.getTimestamp();
+      this.rowKey = Bytes.copy(delete.getRowArray(), delete.getRowOffset(), delete.getRowLength());
+    }
+
+    public boolean isDeleted(Cell cell) {
+      return rowKey != null && Bytes.compareTo(cell.getRowArray(), cell.getRowOffset(), 
+        cell.getRowLength(), rowKey, 0, rowKey.length) == 0 && cell.getTimestamp() <= familyDeleteTs;
+    }
+
+    public void reset() {
+      this.familyDeleteTs = 0;
+      this.rowKey = null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
new file mode 100644
index 0000000..9b856d9
--- /dev/null
+++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.persist.TransactionVisibilityState;
+import org.apache.tephra.util.TxUtils;
+
+import java.io.IOException;
+import javax.annotation.Nullable;
+
+/**
+ * Record compaction state for invalid list pruning
+ */
+public class CompactionState {
+  private static final Log LOG = LogFactory.getLog(CompactionState.class);
+
+  private final byte[] regionName;
+  private final String regionNameAsString;
+  private final PruneUpperBoundWriterSupplier pruneUpperBoundWriterSupplier;
+  private final PruneUpperBoundWriter pruneUpperBoundWriter;
+
+  private volatile long pruneUpperBound = -1;
+
+  public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable, long pruneFlushInterval) {
+    this.regionName = env.getRegionInfo().getRegionName();
+    this.regionNameAsString = env.getRegionInfo().getRegionNameAsString();
+    DataJanitorState dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
+      @Override
+      public Table get() throws IOException {
+        return env.getTable(stateTable);
+      }
+    });
+    this.pruneUpperBoundWriterSupplier = new PruneUpperBoundWriterSupplier(stateTable, dataJanitorState,
+                                                                           pruneFlushInterval);
+    this.pruneUpperBoundWriter = pruneUpperBoundWriterSupplier.get();
+  }
+
+  /**
+   * Records the transaction state used for a compaction. This method is called when the compaction starts.
+   *
+   * @param request {@link CompactionRequest} for the compaction
+   * @param snapshot transaction state that will be used for the compaction
+   */
+  public void record(CompactionRequest request, @Nullable TransactionVisibilityState snapshot) {
+    if (request.isMajor() && snapshot != null) {
+      Transaction tx = TxUtils.createDummyTransaction(snapshot);
+      pruneUpperBound = TxUtils.getPruneUpperBound(tx);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+          String.format("Computed prune upper bound %s for compaction request %s using transaction state from time %s",
+                        pruneUpperBound, request, snapshot.getTimestamp()));
+      }
+    } else {
+      pruneUpperBound = -1;
+    }
+  }
+
+  /**
+   * Persists the transaction state recorded by {@link #record(CompactionRequest, TransactionVisibilityState)}.
+   * This method is called after the compaction has successfully completed.
+   */
+  public void persist() {
+    if (pruneUpperBound != -1) {
+      pruneUpperBoundWriter.persistPruneEntry(regionName, pruneUpperBound);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format("Enqueued prune upper bound %s for region %s", pruneUpperBound, regionNameAsString));
+      }
+    }
+  }
+
+  /**
+   * Persist that the given region is empty at the given time
+   * @param time time in milliseconds
+   */
+  public void persistRegionEmpty(long time) {
+    pruneUpperBoundWriter.persistRegionEmpty(regionName, time);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(String.format("Enqueued empty region %s at time %s", regionNameAsString, time));
+    }
+  }
+
+  /**
+   * Releases the usage {@link PruneUpperBoundWriter}.
+   */
+  public void stop() {
+    pruneUpperBoundWriterSupplier.release();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
new file mode 100644
index 0000000..db59d7d
--- /dev/null
+++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
@@ -0,0 +1,536 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.tephra.txprune.RegionPruneInfo;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import javax.annotation.Nullable;
+
+/**
+ * Persist data janitor state into an HBase table.
+ * This is used by both {@link TransactionProcessor} and by the {@link HBaseTransactionPruningPlugin}
+ * to persist and read the compaction state.
+ */
+@SuppressWarnings("WeakerAccess")
+public class DataJanitorState {
+  private static final Log LOG = LogFactory.getLog(DataJanitorState.class);
+
+  public static final byte[] FAMILY = {'f'};
+  public static final byte[] PRUNE_UPPER_BOUND_COL = {'p'};
+
+  private static final byte[] REGION_TIME_COL = {'r'};
+  private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_COL = {'i'};
+  private static final byte[] EMPTY_REGION_TIME_COL = {'e'};
+
+  private static final byte[] REGION_KEY_PREFIX = {0x1};
+  private static final byte[] REGION_KEY_PREFIX_STOP = {0x2};
+
+  private static final byte[] REGION_TIME_KEY_PREFIX = {0x2};
+  private static final byte[] REGION_TIME_KEY_PREFIX_STOP = {0x3};
+
+  private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX = {0x3};
+  private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX_STOP = {0x4};
+
+  private static final byte[] EMPTY_REGION_TIME_KEY_PREFIX = {0x4};
+  private static final byte[] EMPTY_REGION_TIME_KEY_PREFIX_STOP = {0x5};
+
+  private static final byte[] REGION_TIME_COUNT_KEY_PREFIX = {0x5};
+  private static final byte[] REGION_TIME_COUNT_KEY_PREFIX_STOP = {0x6};
+
+  private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
+  // This value can be used when we don't care about the value we write in a column
+  private static final byte[] COL_VAL = Bytes.toBytes('1');
+
+  private final TableSupplier stateTableSupplier;
+
+
+  public DataJanitorState(TableSupplier stateTableSupplier) {
+    this.stateTableSupplier = stateTableSupplier;
+  }
+
+  // ----------------------------------------------------------------
+  // ------- Methods for prune upper bound for a given region -------
+  // ----------------------------------------------------------------
+  // The data is stored in the following format -
+  // Key: 0x1<region-id>
+  // Col 'u': <prune upper bound>
+  // ----------------------------------------------------------------
+
+  /**
+   * Persist the latest prune upper bound for a given region. This is called by {@link TransactionProcessor}
+   * after major compaction.
+   *
+   * @param regionId region id
+   * @param pruneUpperBound the latest prune upper bound for the region
+   * @throws IOException when not able to persist the data to HBase
+   */
+  public void savePruneUpperBoundForRegion(byte[] regionId, long pruneUpperBound) throws IOException {
+    try (Table stateTable = stateTableSupplier.get()) {
+      Put put = new Put(makeRegionKey(regionId));
+      put.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL, Bytes.toBytes(pruneUpperBound));
+      stateTable.put(put);
+    }
+  }
+
+  /**
+   * Get latest prune upper bound for a given region. This indicates the largest invalid transaction that no
+   * longer has writes in this region.
+   *
+   * @param regionId region id
+   * @return latest prune upper bound for the region
+   * @throws IOException when not able to read the data from HBase
+   */
+  public long getPruneUpperBoundForRegion(byte[] regionId) throws IOException {
+    RegionPruneInfo regionPruneInfo = getPruneInfoForRegion(regionId);
+    return (regionPruneInfo == null) ? -1 : regionPruneInfo.getPruneUpperBound();
+  }
+
+  /**
+   * Get the latest {@link RegionPruneInfo} for a given region.
+   *
+   * @param regionId region id
+   * @return {@link RegionPruneInfo} for the region
+   * @throws IOException when not able to read the data from HBase
+   */
+  @Nullable
+  public RegionPruneInfo getPruneInfoForRegion(byte[] regionId) throws IOException {
+    try (Table stateTable = stateTableSupplier.get()) {
+      Get get = new Get(makeRegionKey(regionId));
+      get.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+      Cell cell = stateTable.get(get).getColumnLatestCell(FAMILY, PRUNE_UPPER_BOUND_COL);
+      if (cell == null) {
+        return null;
+      }
+      byte[] pruneUpperBoundBytes = CellUtil.cloneValue(cell);
+      long timestamp = cell.getTimestamp();
+      return new RegionPruneInfo(regionId, Bytes.toStringBinary(regionId),
+                                 Bytes.toLong(pruneUpperBoundBytes), timestamp);
+    }
+  }
+
+  /**
+   * Get latest prune upper bounds for given regions. This is a batch operation of method
+   * {@link #getPruneUpperBoundForRegion(byte[])}
+   *
+   * @param regions a set of regions
+   * @return a map containing region id and its latest prune upper bound value
+   * @throws IOException when not able to read the data from HBase
+   */
+  public Map<byte[], Long> getPruneUpperBoundForRegions(SortedSet<byte[]> regions) throws IOException {
+    Map<byte[], Long> resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+    List<RegionPruneInfo> regionPruneInfos = getPruneInfoForRegions(regions);
+    for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
+      resultMap.put(regionPruneInfo.getRegionName(), regionPruneInfo.getPruneUpperBound());
+    }
+    return Collections.unmodifiableMap(resultMap);
+  }
+
+  /**
+   * Gets a list of {@link RegionPruneInfo} for given regions. Returns all regions if the given regions set is null.
+   *
+   * @param regions a set of regions
+   * @return list of {@link RegionPruneInfo}s.
+   * @throws IOException when not able to read the data from HBase
+   */
+  public List<RegionPruneInfo> getPruneInfoForRegions(@Nullable SortedSet<byte[]> regions) throws IOException {
+    List<RegionPruneInfo> regionPruneInfos = new ArrayList<>();
+    try (Table stateTable = stateTableSupplier.get()) {
+      byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
+      Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
+      scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+
+      try (ResultScanner scanner = stateTable.getScanner(scan)) {
+        Result next;
+        while ((next = scanner.next()) != null) {
+          byte[] region = getRegionFromKey(next.getRow());
+          if (regions == null || regions.contains(region)) {
+            Cell cell = next.getColumnLatestCell(FAMILY, PRUNE_UPPER_BOUND_COL);
+            if (cell != null) {
+              byte[] pruneUpperBoundBytes = CellUtil.cloneValue(cell);
+              long timestamp = cell.getTimestamp();
+              regionPruneInfos.add(new RegionPruneInfo(region, Bytes.toStringBinary(region),
+                                                       Bytes.toLong(pruneUpperBoundBytes), timestamp));
+            }
+          }
+        }
+      }
+    }
+    return Collections.unmodifiableList(regionPruneInfos);
+  }
+
+  /**
+   * Delete prune upper bounds for the regions that are not in the given exclude set, and the
+   * prune upper bound is less than the given value.
+   * After the invalid list is pruned up to deletionPruneUpperBound, we do not need entries for regions that have
+   * prune upper bound less than deletionPruneUpperBound. We however limit the deletion to only regions that are
+   * no longer in existence (due to deletion, etc.), to avoid update/delete race conditions.
+   *
+   * @param deletionPruneUpperBound prune upper bound below which regions will be deleted
+   * @param excludeRegions set of regions that should not be deleted
+   * @throws IOException when not able to delete data in HBase
+   */
+  public void deletePruneUpperBounds(long deletionPruneUpperBound, SortedSet<byte[]> excludeRegions)
+    throws IOException {
+    try (Table stateTable = stateTableSupplier.get()) {
+      byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
+      Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
+      scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+
+      try (ResultScanner scanner = stateTable.getScanner(scan)) {
+        Result next;
+        while ((next = scanner.next()) != null) {
+          byte[] region = getRegionFromKey(next.getRow());
+          if (!excludeRegions.contains(region)) {
+            byte[] timeBytes = next.getValue(FAMILY, PRUNE_UPPER_BOUND_COL);
+            if (timeBytes != null) {
+              long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
+              if (pruneUpperBoundRegion < deletionPruneUpperBound) {
+                stateTable.delete(new Delete(next.getRow()));
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+
+  // ---------------------------------------------------
+  // ------- Methods for regions at a given time -------
+  // ---------------------------------------------------
+  // Key: 0x2<inverted time><region-id>
+  // Col 't': <empty byte array>
+  // ---------------------------------------------------
+
+  /**
+   * Persist the regions for the given time. {@link HBaseTransactionPruningPlugin} saves the set of
+   * transactional regions existing in the HBase instance periodically.
+   *
+   * @param time timestamp in milliseconds
+   * @param regions set of regions at the time
+   * @throws IOException when not able to persist the data to HBase
+   */
+  public void saveRegionsForTime(long time, Set<byte[]> regions) throws IOException {
+    byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
+    try (Table stateTable = stateTableSupplier.get()) {
+      for (byte[] region : regions) {
+        Put put = new Put(makeTimeRegionKey(timeBytes, region));
+        put.addColumn(FAMILY, REGION_TIME_COL, COL_VAL);
+        stateTable.put(put);
+      }
+
+      // Save the count of regions as a checksum
+      saveRegionCountForTime(stateTable, timeBytes, regions.size());
+    }
+  }
+
+  @VisibleForTesting
+  void saveRegionCountForTime(Table stateTable, byte[] timeBytes, int count) throws IOException {
+    Put put = new Put(makeTimeRegionCountKey(timeBytes));
+    put.addColumn(FAMILY, REGION_TIME_COL, Bytes.toBytes(count));
+    stateTable.put(put);
+  }
+
+  /**
+   * Return the set of regions saved for the time at or before the given time. This method finds the greatest time
+   * that is less than or equal to the given time, and then returns all regions with that exact time, but none that are
+   * older than that.
+   *
+   * @param time timestamp in milliseconds
+   * @return set of regions and time at which they were recorded, or null if no regions found
+   * @throws IOException when not able to read the data from HBase
+   */
+  @Nullable
+  public TimeRegions getRegionsOnOrBeforeTime(long time) throws IOException {
+    try (Table stateTable = stateTableSupplier.get()) {
+      TimeRegions timeRegions;
+      while ((timeRegions = getNextSetOfTimeRegions(stateTable, time)) != null) {
+        int count = getRegionCountForTime(stateTable, timeRegions.getTime());
+        if (count != -1 && count == timeRegions.getRegions().size()) {
+          return timeRegions;
+        } else {
+          LOG.warn(String.format("Got incorrect count for regions saved at time %s, expected = %s but actual = %s",
+                                 timeRegions.getTime(), count, timeRegions.getRegions().size()));
+          time = timeRegions.getTime() - 1;
+        }
+      }
+      return null;
+    }
+  }
+
+  @Nullable
+  private TimeRegions getNextSetOfTimeRegions(Table stateTable, long time) throws IOException {
+    byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
+    Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
+    scan.addColumn(FAMILY, REGION_TIME_COL);
+
+
+    long currentRegionTime = -1;
+    SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+    Result next;
+    try (ResultScanner scanner = stateTable.getScanner(scan)) {
+      while ((next = scanner.next()) != null) {
+        Map.Entry<Long, byte[]> timeRegion = getTimeRegion(next.getRow());
+        // Stop if reached next time value
+        if (currentRegionTime == -1) {
+          currentRegionTime = timeRegion.getKey();
+        } else if (timeRegion.getKey() < currentRegionTime) {
+          break;
+        } else if (timeRegion.getKey() > currentRegionTime) {
+          throw new IllegalStateException(
+            String.format("Got out of order time %d when expecting time less than or equal to %d",
+                          timeRegion.getKey(), currentRegionTime));
+        }
+        regions.add(timeRegion.getValue());
+      }
+    }
+    return regions.isEmpty() ? null : new TimeRegions(currentRegionTime, Collections.unmodifiableSortedSet(regions));
+  }
+
+  @VisibleForTesting
+  int getRegionCountForTime(Table stateTable, long time) throws IOException {
+    Get get = new Get(makeTimeRegionCountKey(Bytes.toBytes(getInvertedTime(time))));
+    get.addColumn(FAMILY, REGION_TIME_COL);
+    Result result = stateTable.get(get);
+    byte[] value = result.getValue(FAMILY, REGION_TIME_COL);
+    return value == null ? -1 : Bytes.toInt(value);
+  }
+
+  /**
+   * Delete all the regions that were recorded for all times equal or less than the given time.
+   *
+   * @param time timestamp in milliseconds
+   * @throws IOException when not able to delete data in HBase
+   */
+  public void deleteAllRegionsOnOrBeforeTime(long time) throws IOException {
+    byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
+    try (Table stateTable = stateTableSupplier.get()) {
+      // Delete the regions
+      Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP);
+      scan.addColumn(FAMILY, REGION_TIME_COL);
+      deleteFromScan(stateTable, scan);
+
+      // Delete the count
+      scan = new Scan(makeTimeRegionCountKey(timeBytes), REGION_TIME_COUNT_KEY_PREFIX_STOP);
+      scan.addColumn(FAMILY, REGION_TIME_COL);
+      deleteFromScan(stateTable, scan);
+    }
+  }
+
+  // ---------------------------------------------------------------------
+  // ------- Methods for inactive transaction bound for given time -------
+  // ---------------------------------------------------------------------
+  // Key: 0x3<inverted time>
+  // Col 'p': <inactive transaction bound>
+  // ---------------------------------------------------------------------
+
+  /**
+   * Persist inactive transaction bound for a given time. This is the smallest not in-progress transaction that
+   * will not have writes in any HBase regions that are created after the given time.
+   *
+   * @param time time in milliseconds
+   * @param inactiveTransactionBound inactive transaction bound for the given time
+   * @throws IOException when not able to persist the data to HBase
+   */
+  public void saveInactiveTransactionBoundForTime(long time, long inactiveTransactionBound) throws IOException {
+    try (Table stateTable = stateTableSupplier.get()) {
+      Put put = new Put(makeInactiveTransactionBoundTimeKey(Bytes.toBytes(getInvertedTime(time))));
+      put.addColumn(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL, Bytes.toBytes(inactiveTransactionBound));
+      stateTable.put(put);
+    }
+  }
+
+  /**
+   * Return inactive transaction bound for the given time.
+   *
+   * @param time time in milliseconds
+   * @return inactive transaction bound for the given time
+   * @throws IOException when not able to read the data from HBase
+   */
+  public long getInactiveTransactionBoundForTime(long time) throws IOException {
+    try (Table stateTable = stateTableSupplier.get()) {
+      Get get = new Get(makeInactiveTransactionBoundTimeKey(Bytes.toBytes(getInvertedTime(time))));
+      get.addColumn(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL);
+      byte[] result = stateTable.get(get).getValue(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL);
+      return result == null ? -1 : Bytes.toLong(result);
+    }
+  }
+
+  /**
+   * Delete all inactive transaction bounds recorded for a time less than the given time
+   *
+   * @param time time in milliseconds
+   * @throws IOException when not able to delete data in HBase
+   */
+  public void deleteInactiveTransactionBoundsOnOrBeforeTime(long time) throws IOException {
+    try (Table stateTable = stateTableSupplier.get()) {
+      Scan scan = new Scan(makeInactiveTransactionBoundTimeKey(Bytes.toBytes(getInvertedTime(time))),
+                           INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX_STOP);
+      scan.addColumn(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL);
+      deleteFromScan(stateTable, scan);
+    }
+  }
+
+  // --------------------------------------------------------
+  // ------- Methods for empty regions at a given time -------
+  // --------------------------------------------------------
+  // Key: 0x4<time><region-id>
+  // Col 'e': <empty byte array>
+  // --------------------------------------------------------
+
+  /**
+   * Save the given region as empty as of the given time.
+   *
+   * @param time time in milliseconds
+   * @param regionId region id
+   */
+  public void saveEmptyRegionForTime(long time, byte[] regionId) throws IOException {
+    byte[] timeBytes = Bytes.toBytes(time);
+    try (Table stateTable = stateTableSupplier.get()) {
+      Put put = new Put(makeEmptyRegionTimeKey(timeBytes, regionId));
+      put.addColumn(FAMILY, EMPTY_REGION_TIME_COL, COL_VAL);
+      stateTable.put(put);
+    }
+  }
+
+  /**
+   * Return regions that were recorded as empty after the given time.
+   *
+   * @param time time in milliseconds
+   * @param includeRegions If not null, the returned set will be an intersection of the includeRegions set
+   *                       and the empty regions after the given time
+   */
+  public SortedSet<byte[]> getEmptyRegionsAfterTime(long time, @Nullable SortedSet<byte[]> includeRegions)
+    throws IOException {
+    SortedSet<byte[]> emptyRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+    try (Table stateTable = stateTableSupplier.get()) {
+      Scan scan = new Scan(makeEmptyRegionTimeKey(Bytes.toBytes(time + 1), EMPTY_BYTE_ARRAY),
+                           EMPTY_REGION_TIME_KEY_PREFIX_STOP);
+      scan.addColumn(FAMILY, EMPTY_REGION_TIME_COL);
+
+      try (ResultScanner scanner = stateTable.getScanner(scan)) {
+        Result next;
+        while ((next = scanner.next()) != null) {
+          byte[] emptyRegion = getEmptyRegionFromKey(next.getRow());
+          if (includeRegions == null || includeRegions.contains(emptyRegion)) {
+            emptyRegions.add(emptyRegion);
+          }
+        }
+      }
+    }
+    return Collections.unmodifiableSortedSet(emptyRegions);
+  }
+
+  /**
+   * Delete empty region records saved on or before the given time.
+   *
+   * @param time time in milliseconds
+   */
+  public void deleteEmptyRegionsOnOrBeforeTime(long time) throws IOException {
+    try (Table stateTable = stateTableSupplier.get()) {
+      Scan scan = new Scan();
+      scan.setStopRow(makeEmptyRegionTimeKey(Bytes.toBytes(time + 1), EMPTY_BYTE_ARRAY));
+      scan.addColumn(FAMILY, EMPTY_REGION_TIME_COL);
+      deleteFromScan(stateTable, scan);
+    }
+  }
+
+  @VisibleForTesting
+  void deleteFromScan(Table stateTable, Scan scan) throws IOException {
+    try (ResultScanner scanner = stateTable.getScanner(scan)) {
+      Result next;
+      while ((next = scanner.next()) != null) {
+        stateTable.delete(new Delete(next.getRow()));
+      }
+    }
+  }
+
+  private byte[] makeRegionKey(byte[] regionId) {
+    return Bytes.add(REGION_KEY_PREFIX, regionId);
+  }
+
+  private byte[] getRegionFromKey(byte[] regionKey) {
+    int prefixLen = REGION_KEY_PREFIX.length;
+    return Bytes.copy(regionKey, prefixLen, regionKey.length - prefixLen);
+  }
+
+  private byte[] makeTimeRegionKey(byte[] time, byte[] regionId) {
+    return Bytes.add(REGION_TIME_KEY_PREFIX, time, regionId);
+  }
+
+  private byte[] makeTimeRegionCountKey(byte[] time) {
+    return Bytes.add(REGION_TIME_COUNT_KEY_PREFIX, time);
+  }
+
+  private byte[] makeInactiveTransactionBoundTimeKey(byte[] time) {
+    return Bytes.add(INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX, time);
+  }
+
+  private Map.Entry<Long, byte[]> getTimeRegion(byte[] key) {
+    int offset = REGION_TIME_KEY_PREFIX.length;
+    long time = getInvertedTime(Bytes.toLong(key, offset));
+    offset += Bytes.SIZEOF_LONG;
+    byte[] regionName = Bytes.copy(key, offset, key.length - offset);
+    return Maps.immutableEntry(time, regionName);
+  }
+
+  private byte[] makeEmptyRegionTimeKey(byte[] time, byte[] regionId) {
+    return Bytes.add(EMPTY_REGION_TIME_KEY_PREFIX, time, regionId);
+  }
+
+  private byte[] getEmptyRegionFromKey(byte[] key) {
+    int prefixLen = EMPTY_REGION_TIME_KEY_PREFIX.length + Bytes.SIZEOF_LONG;
+    return Bytes.copy(key, prefixLen, key.length - prefixLen);
+  }
+
+  private long getInvertedTime(long time) {
+    return Long.MAX_VALUE - time;
+  }
+
+  /**
+   * Supplies table for persisting state
+   */
+  public interface TableSupplier {
+    Table get() throws IOException;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
new file mode 100644
index 0000000..84c480a
--- /dev/null
+++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableExistsException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.tephra.txprune.TransactionPruningPlugin;
+import org.apache.tephra.util.TxUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+/**
+ * Default implementation of the {@link TransactionPruningPlugin} for HBase.
+ *
+ * This plugin determines the prune upper bound for transactional HBase tables that use
+ * coprocessor {@link TransactionProcessor}.
+ *
+ * <h3>State storage:</h3>
+ *
+ * This plugin expects the TransactionProcessor to save the prune upper bound for invalid transactions
+ * after every major compaction of a region. Let's call this <i>(region, prune upper bound)</i>.
+ * In addition, the plugin also persists the following information on a run at time <i>t</i>
+ * <ul>
+ *   <li>
+ *     <i>(t, set of regions)</i>: Set of transactional regions at time <i>t</i>.
+ *     Transactional regions are regions of the tables that have the coprocessor TransactionProcessor
+ *     attached to them.
+ *   </li>
+ *   <li>
+ *     <i>(t, inactive transaction bound)</i>: This is the smallest not in-progress transaction that
+ *     will not have writes in any HBase regions that are created after time <i>t</i>.
+ *     This value is determined by the Transaction Service based on the transaction state at time <i>t</i>
+ *     and passed on to the plugin.
+ *   </li>
+ * </ul>
+ *
+ * <h3>Computing prune upper bound:</h3>
+ *
+ * In a typical HBase instance, there can be a constant change in the number of regions due to region creations,
+ * splits and merges. At any given time there can always be a region on which a major compaction has not been run.
+ * Since the prune upper bound will get recorded for a region only after a major compaction,
+ * using only the latest set of regions we may not be able to find the
+ * prune upper bounds for all the current regions. Hence we persist the set of regions that exist at that time
+ * of each run of the plugin, and use historical region set for time <i>t</i>, <i>t - 1</i>, etc.
+ * to determine the prune upper bound.
+ *
+ * From the regions saved at time <i>t</i>, <i>t - 1</i>, etc.,
+ * the plugin tries to find the latest <i>(t, set of regions)</i> where all regions have been major compacted,
+ * i.e, all regions have prune upper bound recorded in <i>(region, prune upper bound)</i>.
+ * <br/>
+ * If such a set is found for time <i>t1</i>, the prune upper bound returned by the plugin is the minimum of
+ * <ul>
+ *   <li>Prune upper bounds of regions in set <i>(t1, set of regions)</i></li>
+ *   <li>Inactive transaction bound from <i>(t1, inactive transaction bound)</i></li>
+ * </ul>
+ *
+ * <p/>
+ * Above, when we find <i>(t1, set of regions)</i>, there may a region that was created after time <i>t1</i>,
+ * but has a data write from an invalid transaction that is smaller than the prune upper bounds of all
+ * regions in <i>(t1, set of regions)</i>. This is possible because <i>(region, prune upper bound)</i> persisted by
+ * TransactionProcessor is always the latest prune upper bound for a region.
+ * <br/>
+ * However a region created after time <i>t1</i> cannot have writes from an invalid transaction that is smaller than
+ * inactive transaction bound at the time the region was created.
+ * Since we limit the plugin prune upper bound using <i>(t1, inactive transaction bound)</i>,
+ * there should be no invalid transactions smaller than the plugin prune upper bound with writes in any
+ * transactional region of this HBase instance.
+ *
+ * <p/>
+ * Note: If your tables uses a transactional coprocessor other than TransactionProcessor,
+ * then you may need to write a new plugin to compute prune upper bound for those tables.
+ */
+@SuppressWarnings("WeakerAccess")
+public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
+  public static final Logger LOG = LoggerFactory.getLogger(HBaseTransactionPruningPlugin.class);
+
+  protected Configuration conf;
+  protected Connection connection;
+  protected DataJanitorState dataJanitorState;
+
+  @Override
+  public void initialize(Configuration conf) throws IOException {
+    this.conf = conf;
+    this.connection = ConnectionFactory.createConnection(conf);
+
+    final TableName stateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+                                                            TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
+    LOG.info("Initializing plugin with state table {}", stateTable.getNameWithNamespaceInclAsString());
+    createPruneTable(stateTable);
+    this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
+      @Override
+      public Table get() throws IOException {
+        return connection.getTable(stateTable);
+      }
+    });
+  }
+
+  /**
+   * Determines prune upper bound for the data store as mentioned above.
+   */
+  @Override
+  public long fetchPruneUpperBound(long time, long inactiveTransactionBound) throws IOException {
+    LOG.debug("Fetching prune upper bound for time {} and inactive transaction bound {}",
+              time, inactiveTransactionBound);
+    if (time < 0 || inactiveTransactionBound < 0) {
+      return -1;
+    }
+
+    // Get all the current transactional regions
+    SortedSet<byte[]> transactionalRegions = getTransactionalRegions();
+    if (!transactionalRegions.isEmpty()) {
+      LOG.debug("Saving {} transactional regions for time {}", transactionalRegions.size(), time);
+      dataJanitorState.saveRegionsForTime(time, transactionalRegions);
+      // Save inactive transaction bound for time as the final step.
+      // We can then use its existence to make sure that the data for a given time is complete or not
+      LOG.debug("Saving inactive transaction bound {} for time {}", inactiveTransactionBound, time);
+      dataJanitorState.saveInactiveTransactionBoundForTime(time, inactiveTransactionBound);
+    }
+
+    return computePruneUpperBound(new TimeRegions(time, transactionalRegions));
+  }
+
+  /**
+   * After invalid list has been pruned, this cleans up state information that is no longer required.
+   * This includes -
+   * <ul>
+   *   <li>
+   *     <i>(region, prune upper bound)</i> - prune upper bound for regions that are older
+   *     than maxPrunedInvalid
+   *   </li>
+   *   <li>
+   *     <i>(t, set of regions) - Regions set that were recorded on or before the start time
+   *     of maxPrunedInvalid
+   *   </li>
+   *   <li>
+   *     (t, inactive transaction bound) - Smallest not in-progress transaction without any writes in new regions
+   *     information recorded on or before the start time of maxPrunedInvalid
+   *   </li>
+   * </ul>
+   */
+  @Override
+  public void pruneComplete(long time, long maxPrunedInvalid) throws IOException {
+    LOG.debug("Prune complete for time {} and prune upper bound {}", time, maxPrunedInvalid);
+    if (time < 0 || maxPrunedInvalid < 0) {
+      return;
+    }
+
+    // Get regions for the current time, so as to not delete the prune upper bounds for them.
+    // The prune upper bounds for regions are recorded by TransactionProcessor and the deletion
+    // is done by this class. To avoid update/delete race condition, we only delete prune upper
+    // bounds for the stale regions.
+    TimeRegions regionsToExclude = dataJanitorState.getRegionsOnOrBeforeTime(time);
+    if (regionsToExclude != null) {
+      LOG.debug("Deleting prune upper bounds smaller than {} for stale regions", maxPrunedInvalid);
+      dataJanitorState.deletePruneUpperBounds(maxPrunedInvalid, regionsToExclude.getRegions());
+    } else {
+      LOG.warn("Cannot find saved regions on or before time {}", time);
+    }
+    long pruneTime = TxUtils.getTimestamp(maxPrunedInvalid);
+    LOG.debug("Deleting regions recorded before time {}", pruneTime);
+    dataJanitorState.deleteAllRegionsOnOrBeforeTime(pruneTime);
+    LOG.debug("Deleting inactive transaction bounds recorded on or before time {}", pruneTime);
+    dataJanitorState.deleteInactiveTransactionBoundsOnOrBeforeTime(pruneTime);
+    LOG.debug("Deleting empty regions recorded on or before time {}", pruneTime);
+    dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(pruneTime);
+  }
+
+  @Override
+  public void destroy() {
+    LOG.info("Stopping plugin...");
+    try {
+      connection.close();
+    } catch (IOException e) {
+      LOG.error("Got exception while closing HBase connection", e);
+    }
+  }
+
+  /**
+   * Create the prune state table given the {@link TableName} if the table doesn't exist already.
+   *
+   * @param stateTable prune state table name
+   */
+  protected void createPruneTable(TableName stateTable) throws IOException {
+    try (Admin admin = this.connection.getAdmin()) {
+      if (admin.tableExists(stateTable)) {
+        LOG.debug("Not creating pruneStateTable {} since it already exists.",
+                  stateTable.getNameWithNamespaceInclAsString());
+        return;
+      }
+
+      HTableDescriptor htd = new HTableDescriptor(stateTable);
+      htd.addFamily(new HColumnDescriptor(DataJanitorState.FAMILY).setMaxVersions(1));
+      admin.createTable(htd);
+      LOG.info("Created pruneTable {}", stateTable.getNameWithNamespaceInclAsString());
+    } catch (TableExistsException ex) {
+      // Expected if the prune state table is being created at the same time by another client
+      LOG.debug("Not creating pruneStateTable {} since it already exists.",
+                stateTable.getNameWithNamespaceInclAsString(), ex);
+    }
+  }
+
+  /**
+   * Returns whether the table is a transactional table. By default, it is a table is identified as a transactional
+   * table if it has a the coprocessor {@link TransactionProcessor} attached to it. Should be overriden if the users
+   * attach a different coprocessor.
+   *
+   * @param tableDescriptor {@link HTableDescriptor} of the table
+   * @return true if the table is transactional
+   */
+  protected boolean isTransactionalTable(HTableDescriptor tableDescriptor) {
+    return tableDescriptor.hasCoprocessor(TransactionProcessor.class.getName());
+  }
+
+  protected SortedSet<byte[]> getTransactionalRegions() throws IOException {
+    SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+    try (Admin admin = connection.getAdmin()) {
+      HTableDescriptor[] tableDescriptors = admin.listTables();
+      LOG.debug("Got {} tables to process", tableDescriptors == null ? 0 : tableDescriptors.length);
+      if (tableDescriptors != null) {
+        for (HTableDescriptor tableDescriptor : tableDescriptors) {
+          if (isTransactionalTable(tableDescriptor)) {
+            List<HRegionInfo> tableRegions = admin.getTableRegions(tableDescriptor.getTableName());
+            LOG.debug("Regions for table {}: {}", tableDescriptor.getTableName(), tableRegions);
+            if (tableRegions != null) {
+              for (HRegionInfo region : tableRegions) {
+                regions.add(region.getRegionName());
+              }
+            }
+          } else {
+            LOG.debug("{} is not a transactional table", tableDescriptor.getTableName());
+          }
+        }
+      }
+    }
+    return regions;
+  }
+
+  /**
+   * Try to find the latest set of regions in which all regions have been major compacted, and
+   * compute prune upper bound from them. Starting from newest to oldest, this looks into the
+   * region set that has been saved periodically, and joins it with the prune upper bound data
+   * for a region recorded after a major compaction.
+   *
+   * @param timeRegions the latest set of regions
+   * @return prune upper bound
+   * @throws IOException when not able to talk to HBase
+   */
+  private long computePruneUpperBound(TimeRegions timeRegions) throws IOException {
+    do {
+      LOG.debug("Computing prune upper bound for {}", timeRegions);
+      SortedSet<byte[]> transactionalRegions = timeRegions.getRegions();
+      long time = timeRegions.getTime();
+
+      long inactiveTransactionBound = dataJanitorState.getInactiveTransactionBoundForTime(time);
+      LOG.debug("Got inactive transaction bound {}", inactiveTransactionBound);
+      // If inactiveTransactionBound is not recorded then that means the data is not complete for these regions
+      if (inactiveTransactionBound == -1) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Ignoring regions for time {} as no inactiveTransactionBound was found for that time, " +
+                      "and hence the data must be incomplete", time);
+        }
+        continue;
+      }
+
+      // Get the prune upper bounds for all the transactional regions
+      Map<byte[], Long> pruneUpperBoundRegions =
+        dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions);
+      logPruneUpperBoundRegions(pruneUpperBoundRegions);
+
+      // Use inactiveTransactionBound as the prune upper bound for the empty regions since the regions that are
+      // recorded as empty after inactiveTransactionBoundTime will not have invalid data
+      // for transactions started on or before inactiveTransactionBoundTime
+      pruneUpperBoundRegions = handleEmptyRegions(inactiveTransactionBound, transactionalRegions,
+                                                  pruneUpperBoundRegions);
+
+      // If prune upper bounds are found for all the transactional regions, then compute the prune upper bound
+      // across all regions
+      if (!transactionalRegions.isEmpty() &&
+        pruneUpperBoundRegions.size() == transactionalRegions.size()) {
+        Long minPruneUpperBoundRegions = Collections.min(pruneUpperBoundRegions.values());
+        long pruneUpperBound = Math.min(inactiveTransactionBound, minPruneUpperBoundRegions);
+        LOG.debug("Found prune upper bound {} for time {}", pruneUpperBound, time);
+        return pruneUpperBound;
+      } else {
+        if (LOG.isDebugEnabled()) {
+          Sets.SetView<byte[]> difference =
+            Sets.difference(transactionalRegions, pruneUpperBoundRegions.keySet());
+          LOG.debug("Ignoring regions for time {} because the following regions did not record a pruneUpperBound: {}",
+                    time, Iterables.transform(difference, TimeRegions.BYTE_ARR_TO_STRING_FN));
+        }
+      }
+
+      timeRegions = dataJanitorState.getRegionsOnOrBeforeTime(time - 1);
+    } while (timeRegions != null);
+    return -1;
+  }
+
+  private Map<byte[], Long> handleEmptyRegions(long inactiveTransactionBound,
+                                               SortedSet<byte[]> transactionalRegions,
+                                               Map<byte[], Long> pruneUpperBoundRegions) throws IOException {
+    long inactiveTransactionBoundTime = TxUtils.getTimestamp(inactiveTransactionBound);
+    SortedSet<byte[]> emptyRegions =
+      dataJanitorState.getEmptyRegionsAfterTime(inactiveTransactionBoundTime, transactionalRegions);
+    LOG.debug("Got empty transactional regions for inactive transaction bound time {}: {}",
+              inactiveTransactionBoundTime, Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN));
+
+    // The regions that are recorded as empty after inactiveTransactionBoundTime will not have invalid data
+    // for transactions started before or on inactiveTransactionBoundTime. Hence we can consider the prune upper bound
+    // for these empty regions as inactiveTransactionBound
+    Map<byte[], Long> pubWithEmptyRegions = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+    pubWithEmptyRegions.putAll(pruneUpperBoundRegions);
+    for (byte[] emptyRegion : emptyRegions) {
+      if (!pruneUpperBoundRegions.containsKey(emptyRegion)) {
+        pubWithEmptyRegions.put(emptyRegion, inactiveTransactionBound);
+      }
+    }
+    return Collections.unmodifiableMap(pubWithEmptyRegions);
+  }
+
+  private void logPruneUpperBoundRegions(Map<byte[], Long> pruneUpperBoundRegions) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Got region - prune upper bound map: {}",
+                Iterables.transform(pruneUpperBoundRegions.entrySet(),
+                                    new Function<Map.Entry<byte[], Long>, Map.Entry<String, Long>>() {
+                                      @Override
+                                      public Map.Entry<String, Long> apply(Map.Entry<byte[], Long> input) {
+                                        String regionName = TimeRegions.BYTE_ARR_TO_STRING_FN.apply(input.getKey());
+                                        return Maps.immutableEntry(regionName, input.getValue());
+                                      }
+                                    }));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
new file mode 100644
index 0000000..443c998
--- /dev/null
+++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.MinMaxPriorityQueue;
+import com.google.common.collect.Sets;
+import com.google.gson.Gson;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.txprune.RegionPruneInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import javax.annotation.Nullable;
+
+/**
+ * Invalid List Pruning Debug Tool.
+ */
+public class InvalidListPruningDebug {
+  private static final Logger LOG = LoggerFactory.getLogger(InvalidListPruningDebug.class);
+  private static final Gson GSON = new Gson();
+  private DataJanitorState dataJanitorState;
+  private Connection connection;
+  private TableName tableName;
+
+  /**
+   * Initialize the Invalid List Debug Tool.
+   * @param conf {@link Configuration}
+   * @throws IOException
+   */
+  public void initialize(final Configuration conf) throws IOException {
+    LOG.debug("InvalidListPruningDebugMain : initialize method called");
+    connection = ConnectionFactory.createConnection(conf);
+    tableName = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+                                           TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
+    dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
+      @Override
+      public Table get() throws IOException {
+        return connection.getTable(tableName);
+      }
+    });
+  }
+
+  public void destroy() throws IOException {
+    if (connection != null) {
+      connection.close();
+    }
+  }
+
+  /**
+   * Returns a set of regions that are live but are not empty nor have a prune upper bound recorded. These regions
+   * will stop the progress of pruning.
+   *
+   * @param numRegions number of regions
+   * @return {@link Set} of regions that needs to be compacted and flushed
+   */
+  public Set<String> getRegionsToBeCompacted(Integer numRegions) throws IOException {
+    // Fetch the live regions
+    Map<Long, SortedSet<String>> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis());
+    if (latestTimeRegion.isEmpty()) {
+      return new HashSet<>();
+    }
+
+    Long timestamp = latestTimeRegion.keySet().iterator().next();
+    SortedSet<String> liveRegions = latestTimeRegion.get(timestamp);
+
+    SortedSet<byte[]> emptyRegions = dataJanitorState.getEmptyRegionsAfterTime(timestamp, null);
+    SortedSet<String> emptyRegionNames = new TreeSet<>();
+    Iterable<String> regionStrings = Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN);
+    for (String regionString : regionStrings) {
+      emptyRegionNames.add(regionString);
+    }
+
+    Set<String> nonEmptyRegions = Sets.newHashSet(Sets.difference(liveRegions, emptyRegionNames));
+
+    // Get all pruned regions and remove them from the nonEmptyRegions, resulting in a set of regions that are
+    // not empty and have not been registered prune upper bound
+    Queue<RegionPruneInfo> prunedRegions = getIdleRegions(-1);
+    for (RegionPruneInfo prunedRegion : prunedRegions) {
+      if (nonEmptyRegions.contains(prunedRegion.getRegionNameAsString())) {
+        nonEmptyRegions.remove(prunedRegion.getRegionNameAsString());
+      }
+    }
+
+    if ((numRegions < 0) || (numRegions >= nonEmptyRegions.size())) {
+      return nonEmptyRegions;
+    }
+
+    Set<String> subsetRegions = new HashSet<>(numRegions);
+    for (String regionName : nonEmptyRegions) {
+      if (subsetRegions.size() == numRegions) {
+        break;
+      }
+      subsetRegions.add(regionName);
+    }
+    return subsetRegions;
+  }
+
+  /**
+   * Return a list of RegionPruneInfo. These regions are the ones that have the lowest prune upper bounds.
+   * If -1 is passed in, all the regions and their prune upper bound will be returned. Note that only the regions
+   * that are known to be live will be returned.
+   *
+   * @param numRegions number of regions
+   * @return Map of region name and its prune upper bound
+   */
+  public Queue<RegionPruneInfo> getIdleRegions(Integer numRegions) throws IOException {
+    List<RegionPruneInfo> regionPruneInfos = dataJanitorState.getPruneInfoForRegions(null);
+    if (regionPruneInfos.isEmpty()) {
+      return new LinkedList<>();
+    }
+
+    // Create a set with region names
+    Set<String> pruneRegionNameSet = new HashSet<>();
+    for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
+      pruneRegionNameSet.add(regionPruneInfo.getRegionNameAsString());
+    }
+
+    // Fetch the live regions
+    Map<Long, SortedSet<String>> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis());
+    if (!latestTimeRegion.isEmpty()) {
+      SortedSet<String> liveRegions = latestTimeRegion.values().iterator().next();
+      Set<String> liveRegionsWithPruneInfo = Sets.intersection(liveRegions, pruneRegionNameSet);
+      List<RegionPruneInfo> liveRegionWithPruneInfoList = new ArrayList<>();
+      for (RegionPruneInfo regionPruneInfo : regionPruneInfos) {
+        if (liveRegionsWithPruneInfo.contains(regionPruneInfo.getRegionNameAsString())) {
+          liveRegionWithPruneInfoList.add(regionPruneInfo);
+        }
+      }
+
+      // Use the subset of live regions and prune regions
+      regionPruneInfos = liveRegionWithPruneInfoList;
+    }
+
+    if (numRegions < 0) {
+      numRegions = regionPruneInfos.size();
+    }
+
+    Queue<RegionPruneInfo> lowestPrunes = MinMaxPriorityQueue.orderedBy(new Comparator<RegionPruneInfo>() {
+      @Override
+      public int compare(RegionPruneInfo o1, RegionPruneInfo o2) {
+        return (int) (o1.getPruneUpperBound() - o2.getPruneUpperBound());
+      }
+    }).maximumSize(numRegions).create();
+
+    for (RegionPruneInfo pruneInfo : regionPruneInfos) {
+      lowestPrunes.add(pruneInfo);
+    }
+    return lowestPrunes;
+  }
+
+  /**
+   * Return the prune upper bound value of a given region. If no prune upper bound has been written for this region yet,
+   * it will return a null.
+   *
+   * @param regionId region id
+   * @return {@link RegionPruneInfo} of the region
+   * @throws IOException if there are any errors while trying to fetch the {@link RegionPruneInfo}
+   */
+  @Nullable
+  public RegionPruneInfo getRegionPruneInfo(String regionId) throws IOException {
+    return dataJanitorState.getPruneInfoForRegion(Bytes.toBytesBinary(regionId));
+  }
+
+  /**
+   *
+   * @param time Given a time, provide the {@link TimeRegions} at or before that time
+   * @return transactional regions that are present at or before the given time
+   * @throws IOException if there are any errors while trying to fetch the {@link TimeRegions}
+   */
+  public Map<Long, SortedSet<String>> getRegionsOnOrBeforeTime(Long time) throws IOException {
+    Map<Long, SortedSet<String>> regionMap = new HashMap<>();
+    TimeRegions timeRegions = dataJanitorState.getRegionsOnOrBeforeTime(time);
+    if (timeRegions == null) {
+      return regionMap;
+    }
+    SortedSet<String> regionNames = new TreeSet<>();
+    Iterable<String> regionStrings = Iterables.transform(timeRegions.getRegions(), TimeRegions.BYTE_ARR_TO_STRING_FN);
+    for (String regionString : regionStrings) {
+      regionNames.add(regionString);
+    }
+    regionMap.put(timeRegions.getTime(), regionNames);
+    return regionMap;
+  }
+
+  private void printUsage(PrintWriter pw) {
+    pw.println("Usage : org.apache.tephra.hbase.txprune.InvalidListPruning <command> <parameter>");
+    pw.println("Available commands, corresponding parameters are:");
+    pw.println("****************************************************");
+    pw.println("time-region ts");
+    pw.println("Desc: Prints out the transactional regions present in HBase at time 'ts' (in milliseconds) " +
+                 "or the latest time before time 'ts'.");
+    pw.println("idle-regions limit");
+    pw.println("Desc: Prints out 'limit' number of regions which has the lowest prune upper bounds. If '-1' is " +
+                 "provided as the limit, prune upper bounds of all regions are returned.");
+    pw.println("prune-info region-name-as-string");
+    pw.println("Desc: Prints out the Pruning information for the region 'region-name-as-string'");
+    pw.println("to-compact-regions limit");
+    pw.println("Desc: Prints out 'limit' number of regions that are active, but are not empty, " +
+                 "and have not registered a prune upper bound.");
+  }
+
+  private boolean execute(String[] args) throws IOException {
+    try (PrintWriter pw = new PrintWriter(System.out)) {
+      if (args.length != 2) {
+        printUsage(pw);
+        return false;
+      }
+
+      String command = args[0];
+      String parameter = args[1];
+      if ("time-region".equals(command)) {
+        Long time = Long.parseLong(parameter);
+        Map<Long, SortedSet<String>> timeRegion = getRegionsOnOrBeforeTime(time);
+        pw.println(GSON.toJson(timeRegion));
+        return true;
+      } else if ("idle-regions".equals(command)) {
+        Integer numRegions = Integer.parseInt(parameter);
+        Queue<RegionPruneInfo> regionPruneInfos = getIdleRegions(numRegions);
+        pw.println(GSON.toJson(regionPruneInfos));
+        return true;
+      } else if ("prune-info".equals(command)) {
+        RegionPruneInfo regionPruneInfo = getRegionPruneInfo(parameter);
+        if (regionPruneInfo != null) {
+          pw.println(GSON.toJson(regionPruneInfo));
+        } else {
+          pw.println(String.format("No prune info found for the region %s.", parameter));
+        }
+        return true;
+      } else if ("to-compact-regions".equals(command)) {
+        Integer numRegions = Integer.parseInt(parameter);
+        Set<String> toBeCompactedRegions = getRegionsToBeCompacted(numRegions);
+        pw.println(GSON.toJson(toBeCompactedRegions));
+        return true;
+      } else {
+        pw.println(String.format("%s is not a valid command.", command));
+        printUsage(pw);
+        return false;
+      }
+    }
+  }
+
+  public static void main(String[] args) {
+    Configuration hConf = HBaseConfiguration.create();
+    InvalidListPruningDebug pruningDebug = new InvalidListPruningDebug();
+    try {
+      pruningDebug.initialize(hConf);
+      boolean success = pruningDebug.execute(args);
+      pruningDebug.destroy();
+      if (!success) {
+        System.exit(1);
+      }
+    } catch (IOException ex) {
+      LOG.error("Received an exception while trying to execute the debug tool. ", ex);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
new file mode 100644
index 0000000..677710b
--- /dev/null
+++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import com.google.common.util.concurrent.AbstractIdleService;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Thread that will write the the prune upper bound. An instance of this class should be obtained only
+ * through {@link PruneUpperBoundWriterSupplier} which will also handle the lifecycle of this instance.
+ */
+public class PruneUpperBoundWriter extends AbstractIdleService {
+  private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriter.class);
+
+  private final TableName tableName;
+  private final DataJanitorState dataJanitorState;
+  private final long pruneFlushInterval;
+  // Map of region name -> prune upper bound
+  private final ConcurrentSkipListMap<byte[], Long> pruneEntries;
+  // Map of region name -> time the region was found to be empty
+  private final ConcurrentSkipListMap<byte[], Long> emptyRegions;
+
+  private volatile Thread flushThread;
+  private volatile boolean stopped;
+
+  private long lastChecked;
+
+  @SuppressWarnings("WeakerAccess")
+  public PruneUpperBoundWriter(TableName tableName, DataJanitorState dataJanitorState, long pruneFlushInterval) {
+    this.tableName = tableName;
+    this.dataJanitorState = dataJanitorState;
+    this.pruneFlushInterval = pruneFlushInterval;
+    this.pruneEntries = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
+    this.emptyRegions = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
+  }
+
+  @SuppressWarnings("WeakerAccess")
+  public void persistPruneEntry(byte[] regionName, long pruneUpperBound) {
+    warnIfNotRunning(regionName);
+    // The number of entries in this map is bound by the number of regions in this region server and thus it will not
+    // grow indefinitely
+    pruneEntries.put(regionName, pruneUpperBound);
+  }
+
+  @SuppressWarnings("WeakerAccess")
+  public void persistRegionEmpty(byte[] regionName, long time) {
+    warnIfNotRunning(regionName);
+    // The number of entries in this map is bound by the number of regions in this region server and thus it will not
+    // grow indefinitely
+    emptyRegions.put(regionName, time);
+  }
+
+  @SuppressWarnings("WeakerAccess")
+  public boolean isAlive() {
+    return flushThread != null && flushThread.isAlive();
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    LOG.info("Starting PruneUpperBoundWriter Thread.");
+    startFlushThread();
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    LOG.info("Stopping PruneUpperBoundWriter Thread.");
+    stopped = true;
+    if (flushThread != null) {
+      flushThread.interrupt();
+      flushThread.join(TimeUnit.SECONDS.toMillis(1));
+      if (flushThread.isAlive()) {
+        flushThread.interrupt();
+        flushThread.join(TimeUnit.SECONDS.toMillis(1));
+      }
+    }
+  }
+
+  private void startFlushThread() {
+    flushThread = new Thread("tephra-prune-upper-bound-writer") {
+      @Override
+      public void run() {
+        while ((!isInterrupted()) && (!stopped)) {
+          long now = System.currentTimeMillis();
+          if (now > (lastChecked + pruneFlushInterval)) {
+            // should flush data
+            try {
+              User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
+                @Override
+                public Void run() throws Exception {
+                  // Record prune upper bound
+                  while (!pruneEntries.isEmpty()) {
+                    Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
+                    dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
+                    // We can now remove the entry only if the key and value match with what we wrote since it is
+                    // possible that a new pruneUpperBound for the same key has been added
+                    pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
+                  }
+                  // Record empty regions
+                  while (!emptyRegions.isEmpty()) {
+                    Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry();
+                    dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey());
+                    // We can now remove the entry only if the key and value match with what we wrote since it is
+                    // possible that a new value for the same key has been added
+                    emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue());
+                  }
+                  return null;
+                }
+              });
+            } catch (IOException ex) {
+              LOG.warn("Cannot record prune upper bound for a region to table " +
+                         tableName.getNameWithNamespaceInclAsString(), ex);
+            }
+            lastChecked = now;
+          }
+
+          try {
+            TimeUnit.SECONDS.sleep(1);
+          } catch (InterruptedException ex) {
+            interrupt();
+            break;
+          }
+        }
+
+        LOG.info("PruneUpperBound Writer thread terminated.");
+      }
+    };
+
+    flushThread.setDaemon(true);
+    flushThread.start();
+  }
+
+  private void warnIfNotRunning(byte[] regionName) {
+    if (!isRunning() || !isAlive()) {
+      LOG.warn(String.format("Trying to persist prune upper bound for region %s when writer is not %s!",
+                             Bytes.toStringBinary(regionName), isRunning() ? "alive" : "running"));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
new file mode 100644
index 0000000..cb93fab
--- /dev/null
+++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+
+import com.google.common.base.Supplier;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.tephra.coprocessor.CacheSupplier;
+import org.apache.tephra.coprocessor.ReferenceCountedSupplier;
+
+/**
+ * Supplies instances of {@link PruneUpperBoundWriter} implementations.
+ */
+public class PruneUpperBoundWriterSupplier implements CacheSupplier<PruneUpperBoundWriter> {
+
+  private static final ReferenceCountedSupplier<PruneUpperBoundWriter> referenceCountedSupplier =
+    new ReferenceCountedSupplier<>(PruneUpperBoundWriter.class.getSimpleName());
+
+  private final Supplier<PruneUpperBoundWriter> supplier;
+
+  public PruneUpperBoundWriterSupplier(final TableName tableName, final DataJanitorState dataJanitorState,
+                                       final long pruneFlushInterval) {
+    this.supplier = new Supplier<PruneUpperBoundWriter>() {
+      @Override
+      public PruneUpperBoundWriter get() {
+        return new PruneUpperBoundWriter(tableName, dataJanitorState, pruneFlushInterval);
+      }
+    };
+  }
+
+  @Override
+  public PruneUpperBoundWriter get() {
+    return referenceCountedSupplier.getOrCreate(supplier);
+  }
+
+  public void release() {
+    referenceCountedSupplier.release();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/TimeRegions.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/TimeRegions.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/TimeRegions.java
new file mode 100644
index 0000000..4ac8887
--- /dev/null
+++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/TimeRegions.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.util.Objects;
+import java.util.SortedSet;
+
+/**
+ * Contains information on the set of transactional regions recorded at a given time
+ */
+@SuppressWarnings("WeakerAccess")
+public class TimeRegions {
+  static final Function<byte[], String> BYTE_ARR_TO_STRING_FN =
+    new Function<byte[], String>() {
+      @Override
+      public String apply(byte[] input) {
+        return Bytes.toStringBinary(input);
+      }
+    };
+
+  private final long time;
+  private final SortedSet<byte[]> regions;
+
+  public TimeRegions(long time, SortedSet<byte[]> regions) {
+    this.time = time;
+    this.regions = regions;
+  }
+
+  public long getTime() {
+    return time;
+  }
+
+  public SortedSet<byte[]> getRegions() {
+    return regions;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    TimeRegions that = (TimeRegions) o;
+    return time == that.time &&
+      Objects.equals(regions, that.regions);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(time, regions);
+  }
+
+  @Override
+  public String toString() {
+    Iterable<String> regionStrings = Iterables.transform(regions, BYTE_ARR_TO_STRING_FN);
+    return "TimeRegions{" +
+      "time=" + time +
+      ", regions=[" + Joiner.on(" ").join(regionStrings) + "]" +
+      '}';
+  }
+}