You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by go...@apache.org on 2017/02/01 19:34:15 UTC

[1/2] incubator-tephra git commit: TEPHRA-212 Perform writes to prune state asynchronously Pass OperationWithAttributes to ensureValidTxLifetime Add creation of prune State Table

Repository: incubator-tephra
Updated Branches:
  refs/heads/master 87cb21a0f -> 0016b2034


http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
new file mode 100644
index 0000000..c981e15
--- /dev/null
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Thread that will write the the prune upper bound
+ */
+public class PruneUpperBoundWriter {
+  private static final Log LOG = LogFactory.getLog(TransactionProcessor.class);
+
+  private final TableName pruneStateTable;
+  private final DataJanitorState dataJanitorState;
+  private final byte[] regionName;
+  private final String regionNameAsString;
+  private final long pruneFlushInterval;
+  private final AtomicLong pruneUpperBound;
+  private final AtomicBoolean shouldFlush;
+
+  private Thread flushThread;
+  private long lastChecked;
+
+  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, TableName pruneStateTable, String regionNameAsString,
+                               byte[] regionName, long pruneFlushInterval) {
+    this.pruneStateTable = pruneStateTable;
+    this.dataJanitorState = dataJanitorState;
+    this.regionName = regionName;
+    this.regionNameAsString = regionNameAsString;
+    this.pruneFlushInterval = pruneFlushInterval;
+    this.pruneUpperBound = new AtomicLong();
+    this.shouldFlush = new AtomicBoolean(false);
+    startFlushThread();
+  }
+
+  public boolean isAlive() {
+    return flushThread.isAlive();
+  }
+
+  public void persistPruneEntry(long pruneUpperBound) {
+    this.pruneUpperBound.set(pruneUpperBound);
+    this.shouldFlush.set(true);
+  }
+
+  public void stop() {
+    if (flushThread != null) {
+      flushThread.interrupt();
+    }
+  }
+
+  private void startFlushThread() {
+    flushThread = new Thread("tephra-prune-upper-bound-writer") {
+      @Override
+      public void run() {
+        while (!isInterrupted()) {
+          long now = System.currentTimeMillis();
+          if (now > (lastChecked + pruneFlushInterval)) {
+            if (shouldFlush.compareAndSet(true, false)) {
+              // should flush data
+              try {
+                dataJanitorState.savePruneUpperBoundForRegion(regionName, pruneUpperBound.get());
+              } catch (IOException ex) {
+                LOG.warn("Cannot record prune upper bound for region " + regionNameAsString + " in the table " +
+                           pruneStateTable.getNamespaceAsString() + ":" + pruneStateTable.getNameAsString() +
+                           " after compacting region.", ex);
+                // Retry again
+                shouldFlush.set(true);
+              }
+            }
+            lastChecked = now;
+          }
+
+          try {
+            TimeUnit.SECONDS.sleep(1);
+          } catch (InterruptedException ex) {
+            interrupt();
+            break;
+          }
+        }
+
+        LOG.info("PruneUpperBound Writer thread terminated.");
+      }
+    };
+
+    flushThread.setDaemon(true);
+    flushThread.start();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
index 310c710..a431ee3 100644
--- a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
+++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
@@ -59,6 +59,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -78,6 +79,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
     // Setup the configuration to start HBase cluster with the invalid list pruning enabled
     conf = HBaseConfiguration.create();
     conf.setBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, true);
+    // Flush prune data to table quickly, so that tests don't need have to wait long to see updates
+    conf.setLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL, 0L);
     AbstractHBaseTableTest.startMiniCluster();
 
     TransactionStateStorage txStateStorage = new InMemoryTransactionStateStorage();
@@ -135,6 +138,15 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
     }
   }
 
+  private void truncatePruneStateTable() throws Exception {
+    if (hBaseAdmin.tableExists(pruneStateTable)) {
+      if (hBaseAdmin.isTableEnabled(pruneStateTable)) {
+        hBaseAdmin.disableTable(pruneStateTable);
+      }
+      hBaseAdmin.truncateTable(pruneStateTable, true);
+    }
+  }
+
   @Test
   public void testRecordCompactionState() throws Exception {
     DataJanitorState dataJanitorState =
@@ -145,6 +157,13 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
         }
       });
 
+    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
+    // Truncate prune state table to clear any data that might have been written by the previous test
+    // This is required because during the shutdown of the previous test, compaction might have kicked in and the
+    // coprocessor still had some data to flush and it might be flushed at the beginning of this test.
+    truncatePruneStateTable();
+
     // No prune upper bound initially
     Assert.assertEquals(-1,
                         dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
@@ -155,17 +174,23 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
                               ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
     // Run minor compaction
     testUtil.compact(txDataTable1, false);
+    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
     // No prune upper bound after minor compaction too
     Assert.assertEquals(-1,
                         dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
 
     // Run major compaction, and verify prune upper bound
     testUtil.compact(txDataTable1, true);
+    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
     Assert.assertEquals(50,
                         dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
 
     // Run major compaction again with same snapshot, prune upper bound should not change
     testUtil.compact(txDataTable1, true);
+    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
     Assert.assertEquals(50,
                         dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
 
@@ -179,6 +204,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
 
     // Run major compaction again, now prune upper bound should change
     testUtil.compact(txDataTable1, true);
+    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
     Assert.assertEquals(104,
                         dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
   }
@@ -196,6 +223,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
     // Run major compaction, and verify it completes
     long now = System.currentTimeMillis();
     testUtil.compact(txDataTable1, true);
+    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
     long lastMajorCompactionTime = TestTransactionProcessor.lastMajorCompactionTime.get();
     Assert.assertTrue(String.format("Expected %d, but was %d", now, lastMajorCompactionTime),
                       lastMajorCompactionTime >= now);
@@ -209,6 +238,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
     // Run major compaction, and verify it completes
     long now = System.currentTimeMillis();
     testUtil.compact(txDataTable1, true);
+    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
     long lastMajorCompactionTime = TestTransactionProcessor.lastMajorCompactionTime.get();
     Assert.assertTrue(String.format("Expected %d, but was %d", now, lastMajorCompactionTime),
                       lastMajorCompactionTime >= now);
@@ -226,6 +257,14 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
 
     TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
     transactionPruningPlugin.initialize(conf);
+
+    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
+    // Truncate prune state table to clear any data that might have been written by the previous test
+    // This is required because during the shutdown of the previous test, compaction might have kicked in and the
+    // coprocessor still had some data to flush and it might be flushed at the beginning of this test.
+    truncatePruneStateTable();
+
     try {
       // Run without a transaction snapshot first
       long now1 = 200;
@@ -270,6 +309,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
                           .add(getRegionName(txDataTable1, Bytes.toBytes(0)))
                           .build());
       testUtil.compact(txDataTable1, true);
+      // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+      TimeUnit.SECONDS.sleep(2);
       long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
       Assert.assertEquals(expectedPruneUpperBound2, pruneUpperBound2);
 

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 45eed50..5a355e6 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -170,7 +170,9 @@ public class TransactionProcessor extends BaseRegionObserver {
 
   @Override
   public void stop(CoprocessorEnvironment e) throws IOException {
-    // nothing to do
+    if (compactionState != null) {
+      compactionState.stop();
+    }
   }
 
   @Override
@@ -191,7 +193,7 @@ public class TransactionProcessor extends BaseRegionObserver {
   public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability)
     throws IOException {
     Transaction tx = getFromOperation(put);
-    ensureValidTxLifetime(e.getEnvironment(), tx);
+    ensureValidTxLifetime(e.getEnvironment(), put, tx);
   }
 
   @Override
@@ -208,7 +210,7 @@ public class TransactionProcessor extends BaseRegionObserver {
     }
 
     Transaction tx = getFromOperation(delete);
-    ensureValidTxLifetime(e.getEnvironment(), tx);
+    ensureValidTxLifetime(e.getEnvironment(), delete, tx);
 
     // Other deletes are client-initiated and need to be translated into our own tombstones
     // TODO: this should delegate to the DeleteStrategy implementation.
@@ -322,11 +324,16 @@ public class TransactionProcessor extends BaseRegionObserver {
       if (conf != null) {
         pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
                                       TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
-        String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
-                                     TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
-        compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable));
-        LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " +
-                    pruneTable);
+        if (Boolean.TRUE.equals(pruneEnable)) {
+          String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+                                       TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
+          long pruneFlushInterval = TimeUnit.SECONDS.toMillis(
+            conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
+                         TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
+          compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
+          LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
+                      + pruneTable);
+        }
       }
     }
 
@@ -390,11 +397,13 @@ public class TransactionProcessor extends BaseRegionObserver {
    * Make sure that the transaction is within the max valid transaction lifetime.
    *
    * @param env {@link RegionCoprocessorEnvironment} of the Region to which the coprocessor is associated
+   * @param op {@link OperationWithAttributes} HBase operation to access its attributes if required
    * @param tx {@link Transaction} supplied by the
    * @throws DoNotRetryIOException thrown if the transaction is older than the max lifetime of a transaction
    *         IOException throw if the value of max lifetime of transaction is unavailable
    */
   protected void ensureValidTxLifetime(RegionCoprocessorEnvironment env,
+                                       @SuppressWarnings("unused") OperationWithAttributes op,
                                        @Nullable Transaction tx) throws IOException {
     if (tx == null) {
       return;

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
index 850f508..58596be 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
@@ -38,22 +38,27 @@ import javax.annotation.Nullable;
 public class CompactionState {
   private static final Log LOG = LogFactory.getLog(CompactionState.class);
 
+  private final TableName stateTable;
   private final byte[] regionName;
   private final String regionNameAsString;
-  private final TableName stateTable;
   private final DataJanitorState dataJanitorState;
+  private final long pruneFlushInterval;
   private volatile long pruneUpperBound = -1;
 
-  public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable) {
+  private PruneUpperBoundWriter pruneUpperBoundWriter;
+
+  public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable, long pruneFlushInterval) {
+    this.stateTable = stateTable;
     this.regionName = env.getRegionInfo().getRegionName();
     this.regionNameAsString = env.getRegionInfo().getRegionNameAsString();
-    this.stateTable = stateTable;
     this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
       @Override
       public Table get() throws IOException {
         return env.getTable(stateTable);
       }
     });
+    this.pruneFlushInterval = pruneFlushInterval;
+    this.pruneUpperBoundWriter = createPruneUpperBoundWriter();
   }
 
   /**
@@ -75,18 +80,29 @@ public class CompactionState {
   }
 
   /**
+   * Stops the current {@link PruneUpperBoundWriter}.
+   */
+  public void stop() {
+    if (pruneUpperBoundWriter != null) {
+      pruneUpperBoundWriter.stop();
+    }
+  }
+
+  /**
    * Persists the transaction state recorded by {@link #record(CompactionRequest, TransactionVisibilityState)}.
    * This method is called after the compaction has successfully completed.
    */
   public void persist() {
     if (pruneUpperBound != -1) {
-      try {
-        dataJanitorState.savePruneUpperBoundForRegion(regionName, pruneUpperBound);
-        LOG.debug(String.format("Saved prune upper bound %s for region %s", pruneUpperBound, regionNameAsString));
-      } catch (IOException e) {
-        LOG.warn(String.format("Cannot record prune upper bound in table %s after compacting region %s",
-                               stateTable, regionNameAsString), e);
+      if (!pruneUpperBoundWriter.isAlive()) {
+        pruneUpperBoundWriter = createPruneUpperBoundWriter();
       }
+      pruneUpperBoundWriter.persistPruneEntry(pruneUpperBound);
+      LOG.debug(String.format("Enqueued prune upper bound %s for region %s", pruneUpperBound, regionNameAsString));
     }
   }
+
+  private PruneUpperBoundWriter createPruneUpperBoundWriter() {
+    return new PruneUpperBoundWriter(dataJanitorState, stateTable, regionNameAsString, regionName, pruneFlushInterval);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
index c6d03c4..51dc181 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
@@ -46,8 +46,8 @@ import javax.annotation.Nullable;
 @SuppressWarnings("WeakerAccess")
 public class DataJanitorState {
   public static final byte[] FAMILY = {'f'};
+  public static final byte[] PRUNE_UPPER_BOUND_COL = {'p'};
 
-  private static final byte[] PRUNE_UPPER_BOUND_COL = {'p'};
   private static final byte[] REGION_TIME_COL = {'r'};
   private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_COL = {'i'};
 

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
index 83e3948..99c514f 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
@@ -24,8 +24,10 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableExistsException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
@@ -119,10 +121,11 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
   public void initialize(Configuration conf) throws IOException {
     this.conf = conf;
     this.connection = ConnectionFactory.createConnection(conf);
-
+    
     final TableName stateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
                                                             TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
     LOG.info("Initializing plugin with state table {}", stateTable.getNameWithNamespaceInclAsString());
+    createPruneTable(stateTable);
     this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
       @Override
       public Table get() throws IOException {
@@ -209,6 +212,38 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
     }
   }
 
+  /**
+   * Create the prune state table given the {@link TableName} if the table doesn't exist already.
+   *
+   * @param stateTable prune state table name
+   */
+  protected void createPruneTable(TableName stateTable) throws IOException {
+    try (Admin admin = this.connection.getAdmin()) {
+      if (admin.tableExists(stateTable)) {
+        LOG.debug("Not creating pruneStateTable {} since it already exists.",
+                  stateTable.getNameWithNamespaceInclAsString());
+        return;
+      }
+
+      HTableDescriptor htd = new HTableDescriptor(stateTable);
+      htd.addFamily(new HColumnDescriptor(DataJanitorState.FAMILY).setMaxVersions(1));
+      admin.createTable(htd);
+      LOG.info("Created pruneTable {}", stateTable.getNameWithNamespaceInclAsString());
+    } catch (TableExistsException ex) {
+      // Expected if the prune state table is being created at the same time by another client
+      LOG.debug("Not creating pruneStateTable {} since it already exists.",
+                stateTable.getNameWithNamespaceInclAsString(), ex);
+    }
+  }
+
+  /**
+   * Returns whether the table is a transactional table. By default, it is a table is identified as a transactional
+   * table if it has a the coprocessor {@link TransactionProcessor} attached to it. Should be overriden if the users
+   * attach a different coprocessor.
+   *
+   * @param tableDescriptor {@link HTableDescriptor} of the table
+   * @return true if the table is transactional
+   */
   protected boolean isTransactionalTable(HTableDescriptor tableDescriptor) {
     return tableDescriptor.hasCoprocessor(TransactionProcessor.class.getName());
   }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
new file mode 100644
index 0000000..7bceaff
--- /dev/null
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Thread that will write the the prune upper bound
+ */
+public class PruneUpperBoundWriter {
+  private static final Log LOG = LogFactory.getLog(TransactionProcessor.class);
+
+  private final TableName pruneStateTable;
+  private final DataJanitorState dataJanitorState;
+  private final byte[] regionName;
+  private final String regionNameAsString;
+  private final long pruneFlushInterval;
+  private final AtomicLong pruneUpperBound;
+  private final AtomicBoolean shouldFlush;
+
+  private Thread flushThread;
+  private long lastChecked;
+
+  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, TableName pruneStateTable, String regionNameAsString,
+                               byte[] regionName, long pruneFlushInterval) {
+    this.pruneStateTable = pruneStateTable;
+    this.dataJanitorState = dataJanitorState;
+    this.regionName = regionName;
+    this.regionNameAsString = regionNameAsString;
+    this.pruneFlushInterval = pruneFlushInterval;
+    this.pruneUpperBound = new AtomicLong();
+    this.shouldFlush = new AtomicBoolean(false);
+    startFlushThread();
+  }
+
+  public boolean isAlive() {
+    return flushThread.isAlive();
+  }
+
+  public void persistPruneEntry(long pruneUpperBound) {
+    this.pruneUpperBound.set(pruneUpperBound);
+    this.shouldFlush.set(true);
+  }
+
+  public void stop() {
+    if (flushThread != null) {
+      flushThread.interrupt();
+    }
+  }
+
+  private void startFlushThread() {
+    flushThread = new Thread("tephra-prune-upper-bound-writer") {
+      @Override
+      public void run() {
+        while (!isInterrupted()) {
+          long now = System.currentTimeMillis();
+          if (now > (lastChecked + pruneFlushInterval)) {
+            if (shouldFlush.compareAndSet(true, false)) {
+              // should flush data
+              try {
+                dataJanitorState.savePruneUpperBoundForRegion(regionName, pruneUpperBound.get());
+              } catch (IOException ex) {
+                LOG.warn("Cannot record prune upper bound for region " + regionNameAsString + " in the table " +
+                           pruneStateTable.getNameWithNamespaceInclAsString() + " after compacting region.", ex);
+                // Retry again
+                shouldFlush.set(true);
+              }
+            }
+            lastChecked = now;
+          }
+
+          try {
+            TimeUnit.SECONDS.sleep(1);
+          } catch (InterruptedException ex) {
+            interrupt();
+            break;
+          }
+        }
+
+        LOG.info("PruneUpperBound Writer thread terminated.");
+      }
+    };
+
+    flushThread.setDaemon(true);
+    flushThread.start();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
index 310c710..a431ee3 100644
--- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
+++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
@@ -59,6 +59,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -78,6 +79,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
     // Setup the configuration to start HBase cluster with the invalid list pruning enabled
     conf = HBaseConfiguration.create();
     conf.setBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, true);
+    // Flush prune data to table quickly, so that tests don't need have to wait long to see updates
+    conf.setLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL, 0L);
     AbstractHBaseTableTest.startMiniCluster();
 
     TransactionStateStorage txStateStorage = new InMemoryTransactionStateStorage();
@@ -135,6 +138,15 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
     }
   }
 
+  private void truncatePruneStateTable() throws Exception {
+    if (hBaseAdmin.tableExists(pruneStateTable)) {
+      if (hBaseAdmin.isTableEnabled(pruneStateTable)) {
+        hBaseAdmin.disableTable(pruneStateTable);
+      }
+      hBaseAdmin.truncateTable(pruneStateTable, true);
+    }
+  }
+
   @Test
   public void testRecordCompactionState() throws Exception {
     DataJanitorState dataJanitorState =
@@ -145,6 +157,13 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
         }
       });
 
+    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
+    // Truncate prune state table to clear any data that might have been written by the previous test
+    // This is required because during the shutdown of the previous test, compaction might have kicked in and the
+    // coprocessor still had some data to flush and it might be flushed at the beginning of this test.
+    truncatePruneStateTable();
+
     // No prune upper bound initially
     Assert.assertEquals(-1,
                         dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
@@ -155,17 +174,23 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
                               ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
     // Run minor compaction
     testUtil.compact(txDataTable1, false);
+    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
     // No prune upper bound after minor compaction too
     Assert.assertEquals(-1,
                         dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
 
     // Run major compaction, and verify prune upper bound
     testUtil.compact(txDataTable1, true);
+    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
     Assert.assertEquals(50,
                         dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
 
     // Run major compaction again with same snapshot, prune upper bound should not change
     testUtil.compact(txDataTable1, true);
+    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
     Assert.assertEquals(50,
                         dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
 
@@ -179,6 +204,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
 
     // Run major compaction again, now prune upper bound should change
     testUtil.compact(txDataTable1, true);
+    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
     Assert.assertEquals(104,
                         dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
   }
@@ -196,6 +223,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
     // Run major compaction, and verify it completes
     long now = System.currentTimeMillis();
     testUtil.compact(txDataTable1, true);
+    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
     long lastMajorCompactionTime = TestTransactionProcessor.lastMajorCompactionTime.get();
     Assert.assertTrue(String.format("Expected %d, but was %d", now, lastMajorCompactionTime),
                       lastMajorCompactionTime >= now);
@@ -209,6 +238,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
     // Run major compaction, and verify it completes
     long now = System.currentTimeMillis();
     testUtil.compact(txDataTable1, true);
+    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
     long lastMajorCompactionTime = TestTransactionProcessor.lastMajorCompactionTime.get();
     Assert.assertTrue(String.format("Expected %d, but was %d", now, lastMajorCompactionTime),
                       lastMajorCompactionTime >= now);
@@ -226,6 +257,14 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
 
     TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
     transactionPruningPlugin.initialize(conf);
+
+    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
+    // Truncate prune state table to clear any data that might have been written by the previous test
+    // This is required because during the shutdown of the previous test, compaction might have kicked in and the
+    // coprocessor still had some data to flush and it might be flushed at the beginning of this test.
+    truncatePruneStateTable();
+
     try {
       // Run without a transaction snapshot first
       long now1 = 200;
@@ -270,6 +309,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
                           .add(getRegionName(txDataTable1, Bytes.toBytes(0)))
                           .build());
       testUtil.compact(txDataTable1, true);
+      // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+      TimeUnit.SECONDS.sleep(2);
       long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
       Assert.assertEquals(expectedPruneUpperBound2, pruneUpperBound2);
 


[2/2] incubator-tephra git commit: TEPHRA-212 Perform writes to prune state asynchronously Pass OperationWithAttributes to ensureValidTxLifetime Add creation of prune State Table

Posted by go...@apache.org.
TEPHRA-212 Perform writes to prune state asynchronously Pass OperationWithAttributes to ensureValidTxLifetime Add creation of prune State Table

This closes #29 from GitHub.

Signed-off-by: Gokul Gunasekaran <go...@cask.co>


Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/0016b203
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/0016b203
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/0016b203

Branch: refs/heads/master
Commit: 0016b2034d6f0e990fedf7c7961c3b4085480160
Parents: 87cb21a
Author: Gokul Gunasekaran <go...@cask.co>
Authored: Thu Jan 26 13:10:10 2017 -0800
Committer: Gokul Gunasekaran <go...@cask.co>
Committed: Wed Feb 1 11:34:00 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/tephra/TxConstants.java     |   7 ++
 .../hbase/coprocessor/TransactionProcessor.java |  25 +++--
 .../tephra/hbase/txprune/CompactionState.java   |  34 ++++--
 .../tephra/hbase/txprune/DataJanitorState.java  |   2 +-
 .../txprune/HBaseTransactionPruningPlugin.java  |  35 ++++++
 .../hbase/txprune/PruneUpperBoundWriter.java    | 112 +++++++++++++++++++
 .../hbase/txprune/InvalidListPruneTest.java     |  42 +++++++
 .../hbase/coprocessor/TransactionProcessor.java |  25 +++--
 .../tephra/hbase/txprune/CompactionState.java   |  34 ++++--
 .../tephra/hbase/txprune/DataJanitorState.java  |   2 +-
 .../txprune/HBaseTransactionPruningPlugin.java  |  35 ++++++
 .../hbase/txprune/PruneUpperBoundWriter.java    | 112 +++++++++++++++++++
 .../hbase/txprune/InvalidListPruneTest.java     |  41 +++++++
 .../hbase/coprocessor/TransactionProcessor.java |  25 +++--
 .../tephra/hbase/txprune/CompactionState.java   |  34 ++++--
 .../tephra/hbase/txprune/DataJanitorState.java  |   2 +-
 .../txprune/HBaseTransactionPruningPlugin.java  |  35 ++++++
 .../hbase/txprune/PruneUpperBoundWriter.java    | 111 ++++++++++++++++++
 .../hbase/txprune/InvalidListPruneTest.java     |  41 +++++++
 .../hbase/coprocessor/TransactionProcessor.java |  25 +++--
 .../tephra/hbase/txprune/CompactionState.java   |  34 ++++--
 .../tephra/hbase/txprune/DataJanitorState.java  |   2 +-
 .../txprune/HBaseTransactionPruningPlugin.java  |  35 ++++++
 .../hbase/txprune/PruneUpperBoundWriter.java    | 112 +++++++++++++++++++
 .../hbase/txprune/InvalidListPruneTest.java     |  41 +++++++
 .../hbase/coprocessor/TransactionProcessor.java |  25 +++--
 .../tephra/hbase/txprune/CompactionState.java   |  34 ++++--
 .../tephra/hbase/txprune/DataJanitorState.java  |   2 +-
 .../txprune/HBaseTransactionPruningPlugin.java  |  37 +++++-
 .../hbase/txprune/PruneUpperBoundWriter.java    | 111 ++++++++++++++++++
 .../hbase/txprune/InvalidListPruneTest.java     |  41 +++++++
 31 files changed, 1162 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
index 512e93c..1988abf 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
@@ -369,6 +369,12 @@ public class TxConstants {
      * Interval in seconds to schedule prune run.
      */
     public static final String PRUNE_INTERVAL = "data.tx.prune.interval";
+
+    /**
+     * Interval in seconds to schedule flush of prune table entries to store.
+     */
+    public static final String PRUNE_FLUSH_INTERVAL = "data.tx.prune.flush.interval";
+
     /**
      * Comma separated list of invalid transaction pruning plugins to load
      */
@@ -381,6 +387,7 @@ public class TxConstants {
     public static final boolean DEFAULT_PRUNE_ENABLE = false;
     public static final String DEFAULT_PRUNE_STATE_TABLE = "data_tx_janitor_state";
     public static final long DEFAULT_PRUNE_INTERVAL = TimeUnit.HOURS.toSeconds(6);
+    public static final long DEFAULT_PRUNE_FLUSH_INTERVAL = TimeUnit.MINUTES.toSeconds(1);
     public static final String DEFAULT_PLUGIN = "data.tx.prune.plugin.default";
     public static final String DEFAULT_PLUGIN_CLASS =
       "org.apache.tephra.hbase.txprune.HBaseTransactionPruningPlugin";

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index c42dd64..3d1c7f1 100644
--- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -170,7 +170,9 @@ public class TransactionProcessor extends BaseRegionObserver {
 
   @Override
   public void stop(CoprocessorEnvironment e) throws IOException {
-    // nothing to do
+    if (compactionState != null) {
+      compactionState.stop();
+    }
   }
 
   @Override
@@ -191,7 +193,7 @@ public class TransactionProcessor extends BaseRegionObserver {
   public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability)
     throws IOException {
     Transaction tx = getFromOperation(put);
-    ensureValidTxLifetime(e.getEnvironment(), tx);
+    ensureValidTxLifetime(e.getEnvironment(), put, tx);
   }
 
   @Override
@@ -208,7 +210,7 @@ public class TransactionProcessor extends BaseRegionObserver {
     }
 
     Transaction tx = getFromOperation(delete);
-    ensureValidTxLifetime(e.getEnvironment(), tx);
+    ensureValidTxLifetime(e.getEnvironment(), delete, tx);
 
     // Other deletes are client-initiated and need to be translated into our own tombstones
     // TODO: this should delegate to the DeleteStrategy implementation.
@@ -322,11 +324,16 @@ public class TransactionProcessor extends BaseRegionObserver {
       if (conf != null) {
         pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
                                       TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
-        String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
-                                     TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
-        compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable));
-        LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " +
-                    pruneTable);
+        if (Boolean.TRUE.equals(pruneEnable)) {
+          String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+                                       TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
+          long pruneFlushInterval = TimeUnit.SECONDS.toMillis(
+            conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
+                         TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
+          compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
+          LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
+                      + pruneTable);
+        }
       }
     }
 
@@ -390,11 +397,13 @@ public class TransactionProcessor extends BaseRegionObserver {
    * Make sure that the transaction is within the max valid transaction lifetime.
    *
    * @param env {@link RegionCoprocessorEnvironment} of the Region to which the coprocessor is associated
+   * @param op {@link OperationWithAttributes} HBase operation to access its attributes if required
    * @param tx {@link Transaction} supplied by the
    * @throws DoNotRetryIOException thrown if the transaction is older than the max lifetime of a transaction
    *         IOException throw if the value of max lifetime of transaction is unavailable
    */
   protected void ensureValidTxLifetime(RegionCoprocessorEnvironment env,
+                                       @SuppressWarnings("unused") OperationWithAttributes op,
                                        @Nullable Transaction tx) throws IOException {
     if (tx == null) {
       return;

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
index 733f636..f4f1d43 100644
--- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
@@ -38,22 +38,27 @@ import javax.annotation.Nullable;
 public class CompactionState {
   private static final Log LOG = LogFactory.getLog(CompactionState.class);
 
+  private final TableName stateTable;
   private final byte[] regionName;
   private final String regionNameAsString;
-  private final TableName stateTable;
   private final DataJanitorState dataJanitorState;
+  private final long pruneFlushInterval;
   private volatile long pruneUpperBound = -1;
 
-  public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable) {
+  private PruneUpperBoundWriter pruneUpperBoundWriter;
+
+  public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable, long pruneFlushInterval) {
+    this.stateTable = stateTable;
     this.regionName = env.getRegion().getRegionName();
     this.regionNameAsString = env.getRegion().getRegionNameAsString();
-    this.stateTable = stateTable;
     this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
       @Override
       public HTableInterface get() throws IOException {
         return env.getTable(stateTable);
       }
     });
+    this.pruneFlushInterval = pruneFlushInterval;
+    this.pruneUpperBoundWriter = createPruneUpperBoundWriter();
   }
 
   /**
@@ -75,18 +80,29 @@ public class CompactionState {
   }
 
   /**
+   * Stops the current {@link PruneUpperBoundWriter}.
+   */
+  public void stop() {
+    if (pruneUpperBoundWriter != null) {
+      pruneUpperBoundWriter.stop();
+    }
+  }
+
+  /**
    * Persists the transaction state recorded by {@link #record(CompactionRequest, TransactionVisibilityState)}.
    * This method is called after the compaction has successfully completed.
    */
   public void persist() {
     if (pruneUpperBound != -1) {
-      try {
-        dataJanitorState.savePruneUpperBoundForRegion(regionName, pruneUpperBound);
-        LOG.debug(String.format("Saved prune upper bound %s for region %s", pruneUpperBound, regionNameAsString));
-      } catch (IOException e) {
-        LOG.warn(String.format("Cannot record prune upper bound in table %s after compacting region %s",
-                               stateTable, regionNameAsString), e);
+      if (!pruneUpperBoundWriter.isAlive()) {
+        pruneUpperBoundWriter = createPruneUpperBoundWriter();
       }
+      pruneUpperBoundWriter.persistPruneEntry(pruneUpperBound);
+      LOG.debug(String.format("Enqueued prune upper bound %s for region %s", pruneUpperBound, regionNameAsString));
     }
   }
+
+  private PruneUpperBoundWriter createPruneUpperBoundWriter() {
+    return new PruneUpperBoundWriter(dataJanitorState, stateTable, regionNameAsString, regionName, pruneFlushInterval);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
index bde843b..5817fe2 100644
--- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
@@ -46,8 +46,8 @@ import javax.annotation.Nullable;
 @SuppressWarnings("WeakerAccess")
 public class DataJanitorState {
   public static final byte[] FAMILY = {'f'};
+  public static final byte[] PRUNE_UPPER_BOUND_COL = {'p'};
 
-  private static final byte[] PRUNE_UPPER_BOUND_COL = {'p'};
   private static final byte[] REGION_TIME_COL = {'r'};
   private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_COL = {'i'};
 

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
index 44dadc3..80da8d8 100644
--- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
@@ -24,8 +24,10 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableExistsException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HConnection;
@@ -126,6 +128,7 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
                                                             TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
     LOG.info("Initializing plugin with state table {}:{}", stateTable.getNamespaceAsString(),
              stateTable.getNameAsString());
+    createPruneTable(stateTable);
     this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
       @Override
       public HTableInterface get() throws IOException {
@@ -218,6 +221,38 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
     }
   }
 
+  /**
+   * Create the prune state table given the {@link TableName} if the table doesn't exist already.
+   *
+   * @param stateTable prune state table name
+   */
+  protected void createPruneTable(TableName stateTable) throws IOException {
+    try {
+      if (hBaseAdmin.tableExists(stateTable)) {
+        LOG.debug("Not creating pruneStateTable {}:{} since it already exists.",
+                  stateTable.getNamespaceAsString(), stateTable.getNameAsString());
+        return;
+      }
+
+      HTableDescriptor htd = new HTableDescriptor(stateTable);
+      htd.addFamily(new HColumnDescriptor(DataJanitorState.FAMILY).setMaxVersions(1));
+      hBaseAdmin.createTable(htd);
+      LOG.info("Created pruneTable {}:{}", stateTable.getNamespaceAsString(), stateTable.getNameAsString());
+    } catch (TableExistsException ex) {
+      // Expected if the prune state table is being created at the same time by another client
+      LOG.debug("Not creating pruneStateTable {}:{} since it already exists.",
+                stateTable.getNamespaceAsString(), stateTable.getNameAsString(), ex);
+    }
+  }
+
+  /**
+   * Returns whether the table is a transactional table. By default, it is a table is identified as a transactional
+   * table if it has a the coprocessor {@link TransactionProcessor} attached to it. Should be overriden if the users
+   * attach a different coprocessor.
+   *
+   * @param tableDescriptor {@link HTableDescriptor} of the table
+   * @return true if the table is transactional
+   */
   protected boolean isTransactionalTable(HTableDescriptor tableDescriptor) {
     return tableDescriptor.hasCoprocessor(TransactionProcessor.class.getName());
   }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
new file mode 100644
index 0000000..c981e15
--- /dev/null
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Thread that will write the the prune upper bound
+ */
+public class PruneUpperBoundWriter {
+  private static final Log LOG = LogFactory.getLog(TransactionProcessor.class);
+
+  private final TableName pruneStateTable;
+  private final DataJanitorState dataJanitorState;
+  private final byte[] regionName;
+  private final String regionNameAsString;
+  private final long pruneFlushInterval;
+  private final AtomicLong pruneUpperBound;
+  private final AtomicBoolean shouldFlush;
+
+  private Thread flushThread;
+  private long lastChecked;
+
+  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, TableName pruneStateTable, String regionNameAsString,
+                               byte[] regionName, long pruneFlushInterval) {
+    this.pruneStateTable = pruneStateTable;
+    this.dataJanitorState = dataJanitorState;
+    this.regionName = regionName;
+    this.regionNameAsString = regionNameAsString;
+    this.pruneFlushInterval = pruneFlushInterval;
+    this.pruneUpperBound = new AtomicLong();
+    this.shouldFlush = new AtomicBoolean(false);
+    startFlushThread();
+  }
+
+  public boolean isAlive() {
+    return flushThread.isAlive();
+  }
+
+  public void persistPruneEntry(long pruneUpperBound) {
+    this.pruneUpperBound.set(pruneUpperBound);
+    this.shouldFlush.set(true);
+  }
+
+  public void stop() {
+    if (flushThread != null) {
+      flushThread.interrupt();
+    }
+  }
+
+  private void startFlushThread() {
+    flushThread = new Thread("tephra-prune-upper-bound-writer") {
+      @Override
+      public void run() {
+        while (!isInterrupted()) {
+          long now = System.currentTimeMillis();
+          if (now > (lastChecked + pruneFlushInterval)) {
+            if (shouldFlush.compareAndSet(true, false)) {
+              // should flush data
+              try {
+                dataJanitorState.savePruneUpperBoundForRegion(regionName, pruneUpperBound.get());
+              } catch (IOException ex) {
+                LOG.warn("Cannot record prune upper bound for region " + regionNameAsString + " in the table " +
+                           pruneStateTable.getNamespaceAsString() + ":" + pruneStateTable.getNameAsString() +
+                           " after compacting region.", ex);
+                // Retry again
+                shouldFlush.set(true);
+              }
+            }
+            lastChecked = now;
+          }
+
+          try {
+            TimeUnit.SECONDS.sleep(1);
+          } catch (InterruptedException ex) {
+            interrupt();
+            break;
+          }
+        }
+
+        LOG.info("PruneUpperBound Writer thread terminated.");
+      }
+    };
+
+    flushThread.setDaemon(true);
+    flushThread.start();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
index 197a6e2..fbd4d7d 100644
--- a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
+++ b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
@@ -61,6 +61,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -82,6 +83,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
     // Setup the configuration to start HBase cluster with the invalid list pruning enabled
     conf = HBaseConfiguration.create();
     conf.setBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, true);
+    // Flush prune data to table quickly, so that tests don't need have to wait long to see updates
+    conf.setLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL, 0L);
     AbstractHBaseTableTest.startMiniCluster();
 
     TransactionStateStorage txStateStorage = new InMemoryTransactionStateStorage();
@@ -141,6 +144,17 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
     }
   }
 
+  private void truncatePruneStateTable() throws Exception {
+    if (hBaseAdmin.tableExists(pruneStateTable)) {
+      if (hBaseAdmin.isTableEnabled(pruneStateTable)) {
+        hBaseAdmin.disableTable(pruneStateTable);
+      }
+      HTableDescriptor htd = hBaseAdmin.getTableDescriptor(pruneStateTable);
+      hBaseAdmin.deleteTable(pruneStateTable);
+      hBaseAdmin.createTable(htd);
+    }
+  }
+
   @Test
   public void testRecordCompactionState() throws Exception {
     DataJanitorState dataJanitorState =
@@ -151,6 +165,13 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
         }
       });
 
+    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
+    // Truncate prune state table to clear any data that might have been written by the previous test
+    // This is required because during the shutdown of the previous test, compaction might have kicked in and the
+    // coprocessor still had some data to flush and it might be flushed at the beginning of this test.
+    truncatePruneStateTable();
+
     // No prune upper bound initially
     Assert.assertEquals(-1,
                         dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
@@ -161,17 +182,23 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
                               ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
     // Run minor compaction
     testUtil.compact(txDataTable1, false);
+    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
     // No prune upper bound after minor compaction too
     Assert.assertEquals(-1,
                         dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
 
     // Run major compaction, and verify prune upper bound
     testUtil.compact(txDataTable1, true);
+    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
     Assert.assertEquals(50,
                         dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
 
     // Run major compaction again with same snapshot, prune upper bound should not change
     testUtil.compact(txDataTable1, true);
+    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
     Assert.assertEquals(50,
                         dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
 
@@ -185,6 +212,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
 
     // Run major compaction again, now prune upper bound should change
     testUtil.compact(txDataTable1, true);
+    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
     Assert.assertEquals(104,
                         dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
   }
@@ -202,6 +231,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
     // Run major compaction, and verify it completes
     long now = System.currentTimeMillis();
     testUtil.compact(txDataTable1, true);
+    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
     long lastMajorCompactionTime = TestTransactionProcessor.lastMajorCompactionTime.get();
     Assert.assertTrue(String.format("Expected %d, but was %d", now, lastMajorCompactionTime),
                       lastMajorCompactionTime >= now);
@@ -215,6 +246,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
     // Run major compaction, and verify it completes
     long now = System.currentTimeMillis();
     testUtil.compact(txDataTable1, true);
+    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
     long lastMajorCompactionTime = TestTransactionProcessor.lastMajorCompactionTime.get();
     Assert.assertTrue(String.format("Expected %d, but was %d", now, lastMajorCompactionTime),
                       lastMajorCompactionTime >= now);
@@ -230,6 +263,13 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
         }
       });
 
+    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
+    // Truncate prune state table to clear any data that might have been written by the previous test
+    // This is required because during the shutdown of the previous test, compaction might have kicked in and the
+    // coprocessor still had some data to flush and it might be flushed at the beginning of this test.
+    truncatePruneStateTable();
+
     TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
     transactionPruningPlugin.initialize(conf);
     try {
@@ -276,6 +316,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
                           .add(getRegionName(txDataTable1, Bytes.toBytes(0)))
                           .build());
       testUtil.compact(txDataTable1, true);
+      // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+      TimeUnit.SECONDS.sleep(2);
       long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
       Assert.assertEquals(expectedPruneUpperBound2, pruneUpperBound2);
 

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 19eb09e..728adfa 100644
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -170,7 +170,9 @@ public class TransactionProcessor extends BaseRegionObserver {
 
   @Override
   public void stop(CoprocessorEnvironment e) throws IOException {
-    // nothing to do
+    if (compactionState != null) {
+      compactionState.stop();
+    }
   }
 
   @Override
@@ -191,7 +193,7 @@ public class TransactionProcessor extends BaseRegionObserver {
   public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability)
     throws IOException {
     Transaction tx = getFromOperation(put);
-    ensureValidTxLifetime(e.getEnvironment(), tx);
+    ensureValidTxLifetime(e.getEnvironment(), put, tx);
   }
 
   @Override
@@ -208,7 +210,7 @@ public class TransactionProcessor extends BaseRegionObserver {
     }
 
     Transaction tx = getFromOperation(delete);
-    ensureValidTxLifetime(e.getEnvironment(), tx);
+    ensureValidTxLifetime(e.getEnvironment(), delete, tx);
 
     // Other deletes are client-initiated and need to be translated into our own tombstones
     // TODO: this should delegate to the DeleteStrategy implementation.
@@ -322,11 +324,16 @@ public class TransactionProcessor extends BaseRegionObserver {
       if (conf != null) {
         pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
                                       TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
-        String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
-                                     TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
-        compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable));
-        LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " +
-                    pruneTable);
+        if (Boolean.TRUE.equals(pruneEnable)) {
+          String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+                                       TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
+          long pruneFlushInterval = TimeUnit.SECONDS.toMillis(
+            conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
+                         TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
+          compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
+          LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
+                      + pruneTable);
+        }
       }
     }
 
@@ -390,11 +397,13 @@ public class TransactionProcessor extends BaseRegionObserver {
    * Make sure that the transaction is within the max valid transaction lifetime.
    *
    * @param env {@link RegionCoprocessorEnvironment} of the Region to which the coprocessor is associated
+   * @param op {@link OperationWithAttributes} HBase operation to access its attributes if required
    * @param tx {@link Transaction} supplied by the
    * @throws DoNotRetryIOException thrown if the transaction is older than the max lifetime of a transaction
    *         IOException throw if the value of max lifetime of transaction is unavailable
    */
   protected void ensureValidTxLifetime(RegionCoprocessorEnvironment env,
+                                       @SuppressWarnings("unused") OperationWithAttributes op,
                                        @Nullable Transaction tx) throws IOException {
     if (tx == null) {
       return;

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
index 38a79d6..a1a59ad 100644
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
@@ -38,22 +38,27 @@ import javax.annotation.Nullable;
 public class CompactionState {
   private static final Log LOG = LogFactory.getLog(CompactionState.class);
 
+  private final TableName stateTable;
   private final byte[] regionName;
   private final String regionNameAsString;
-  private final TableName stateTable;
   private final DataJanitorState dataJanitorState;
+  private final long pruneFlushInterval;
   private volatile long pruneUpperBound = -1;
 
-  public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable) {
+  private PruneUpperBoundWriter pruneUpperBoundWriter;
+
+  public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable, long pruneFlushInterval) {
+    this.stateTable = stateTable;
     this.regionName = env.getRegionInfo().getRegionName();
     this.regionNameAsString = env.getRegionInfo().getRegionNameAsString();
-    this.stateTable = stateTable;
     this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
       @Override
       public HTableInterface get() throws IOException {
         return env.getTable(stateTable);
       }
     });
+    this.pruneFlushInterval = pruneFlushInterval;
+    this.pruneUpperBoundWriter = createPruneUpperBoundWriter();
   }
 
   /**
@@ -75,18 +80,29 @@ public class CompactionState {
   }
 
   /**
+   * Stops the current {@link PruneUpperBoundWriter}.
+   */
+  public void stop() {
+    if (pruneUpperBoundWriter != null) {
+      pruneUpperBoundWriter.stop();
+    }
+  }
+
+  /**
    * Persists the transaction state recorded by {@link #record(CompactionRequest, TransactionVisibilityState)}.
    * This method is called after the compaction has successfully completed.
    */
   public void persist() {
     if (pruneUpperBound != -1) {
-      try {
-        dataJanitorState.savePruneUpperBoundForRegion(regionName, pruneUpperBound);
-        LOG.debug(String.format("Saved prune upper bound %s for region %s", pruneUpperBound, regionNameAsString));
-      } catch (IOException e) {
-        LOG.warn(String.format("Cannot record prune upper bound in table %s after compacting region %s",
-                               stateTable, regionNameAsString), e);
+      if (!pruneUpperBoundWriter.isAlive()) {
+        pruneUpperBoundWriter = createPruneUpperBoundWriter();
       }
+      pruneUpperBoundWriter.persistPruneEntry(pruneUpperBound);
+      LOG.debug(String.format("Enqueued prune upper bound %s for region %s", pruneUpperBound, regionNameAsString));
     }
   }
+
+  private PruneUpperBoundWriter createPruneUpperBoundWriter() {
+    return new PruneUpperBoundWriter(dataJanitorState, stateTable, regionNameAsString, regionName, pruneFlushInterval);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
index bde843b..5817fe2 100644
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
@@ -46,8 +46,8 @@ import javax.annotation.Nullable;
 @SuppressWarnings("WeakerAccess")
 public class DataJanitorState {
   public static final byte[] FAMILY = {'f'};
+  public static final byte[] PRUNE_UPPER_BOUND_COL = {'p'};
 
-  private static final byte[] PRUNE_UPPER_BOUND_COL = {'p'};
   private static final byte[] REGION_TIME_COL = {'r'};
   private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_COL = {'i'};
 

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
index 44dadc3..80da8d8 100644
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
@@ -24,8 +24,10 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableExistsException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HConnection;
@@ -126,6 +128,7 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
                                                             TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
     LOG.info("Initializing plugin with state table {}:{}", stateTable.getNamespaceAsString(),
              stateTable.getNameAsString());
+    createPruneTable(stateTable);
     this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
       @Override
       public HTableInterface get() throws IOException {
@@ -218,6 +221,38 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
     }
   }
 
+  /**
+   * Create the prune state table given the {@link TableName} if the table doesn't exist already.
+   *
+   * @param stateTable prune state table name
+   */
+  protected void createPruneTable(TableName stateTable) throws IOException {
+    try {
+      if (hBaseAdmin.tableExists(stateTable)) {
+        LOG.debug("Not creating pruneStateTable {}:{} since it already exists.",
+                  stateTable.getNamespaceAsString(), stateTable.getNameAsString());
+        return;
+      }
+
+      HTableDescriptor htd = new HTableDescriptor(stateTable);
+      htd.addFamily(new HColumnDescriptor(DataJanitorState.FAMILY).setMaxVersions(1));
+      hBaseAdmin.createTable(htd);
+      LOG.info("Created pruneTable {}:{}", stateTable.getNamespaceAsString(), stateTable.getNameAsString());
+    } catch (TableExistsException ex) {
+      // Expected if the prune state table is being created at the same time by another client
+      LOG.debug("Not creating pruneStateTable {}:{} since it already exists.",
+                stateTable.getNamespaceAsString(), stateTable.getNameAsString(), ex);
+    }
+  }
+
+  /**
+   * Returns whether the table is a transactional table. By default, it is a table is identified as a transactional
+   * table if it has a the coprocessor {@link TransactionProcessor} attached to it. Should be overriden if the users
+   * attach a different coprocessor.
+   *
+   * @param tableDescriptor {@link HTableDescriptor} of the table
+   * @return true if the table is transactional
+   */
   protected boolean isTransactionalTable(HTableDescriptor tableDescriptor) {
     return tableDescriptor.hasCoprocessor(TransactionProcessor.class.getName());
   }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
new file mode 100644
index 0000000..c981e15
--- /dev/null
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Thread that will write the the prune upper bound
+ */
+public class PruneUpperBoundWriter {
+  private static final Log LOG = LogFactory.getLog(TransactionProcessor.class);
+
+  private final TableName pruneStateTable;
+  private final DataJanitorState dataJanitorState;
+  private final byte[] regionName;
+  private final String regionNameAsString;
+  private final long pruneFlushInterval;
+  private final AtomicLong pruneUpperBound;
+  private final AtomicBoolean shouldFlush;
+
+  private Thread flushThread;
+  private long lastChecked;
+
+  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, TableName pruneStateTable, String regionNameAsString,
+                               byte[] regionName, long pruneFlushInterval) {
+    this.pruneStateTable = pruneStateTable;
+    this.dataJanitorState = dataJanitorState;
+    this.regionName = regionName;
+    this.regionNameAsString = regionNameAsString;
+    this.pruneFlushInterval = pruneFlushInterval;
+    this.pruneUpperBound = new AtomicLong();
+    this.shouldFlush = new AtomicBoolean(false);
+    startFlushThread();
+  }
+
+  public boolean isAlive() {
+    return flushThread.isAlive();
+  }
+
+  public void persistPruneEntry(long pruneUpperBound) {
+    this.pruneUpperBound.set(pruneUpperBound);
+    this.shouldFlush.set(true);
+  }
+
+  public void stop() {
+    if (flushThread != null) {
+      flushThread.interrupt();
+    }
+  }
+
+  private void startFlushThread() {
+    flushThread = new Thread("tephra-prune-upper-bound-writer") {
+      @Override
+      public void run() {
+        while (!isInterrupted()) {
+          long now = System.currentTimeMillis();
+          if (now > (lastChecked + pruneFlushInterval)) {
+            if (shouldFlush.compareAndSet(true, false)) {
+              // should flush data
+              try {
+                dataJanitorState.savePruneUpperBoundForRegion(regionName, pruneUpperBound.get());
+              } catch (IOException ex) {
+                LOG.warn("Cannot record prune upper bound for region " + regionNameAsString + " in the table " +
+                           pruneStateTable.getNamespaceAsString() + ":" + pruneStateTable.getNameAsString() +
+                           " after compacting region.", ex);
+                // Retry again
+                shouldFlush.set(true);
+              }
+            }
+            lastChecked = now;
+          }
+
+          try {
+            TimeUnit.SECONDS.sleep(1);
+          } catch (InterruptedException ex) {
+            interrupt();
+            break;
+          }
+        }
+
+        LOG.info("PruneUpperBound Writer thread terminated.");
+      }
+    };
+
+    flushThread.setDaemon(true);
+    flushThread.start();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
index 197a6e2..37f732c 100644
--- a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
+++ b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
@@ -61,6 +61,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -82,6 +83,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
     // Setup the configuration to start HBase cluster with the invalid list pruning enabled
     conf = HBaseConfiguration.create();
     conf.setBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, true);
+    // Flush prune data to table quickly, so that tests don't need have to wait long to see updates
+    conf.setLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL, 0L);
     AbstractHBaseTableTest.startMiniCluster();
 
     TransactionStateStorage txStateStorage = new InMemoryTransactionStateStorage();
@@ -141,6 +144,15 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
     }
   }
 
+  private void truncatePruneStateTable() throws Exception {
+    if (hBaseAdmin.tableExists(pruneStateTable)) {
+      if (hBaseAdmin.isTableEnabled(pruneStateTable)) {
+        hBaseAdmin.disableTable(pruneStateTable);
+      }
+      hBaseAdmin.truncateTable(pruneStateTable, true);
+    }
+  }
+
   @Test
   public void testRecordCompactionState() throws Exception {
     DataJanitorState dataJanitorState =
@@ -151,6 +163,13 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
         }
       });
 
+    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
+    // Truncate prune state table to clear any data that might have been written by the previous test
+    // This is required because during the shutdown of the previous test, compaction might have kicked in and the
+    // coprocessor still had some data to flush and it might be flushed at the beginning of this test.
+    truncatePruneStateTable();
+
     // No prune upper bound initially
     Assert.assertEquals(-1,
                         dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
@@ -161,17 +180,23 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
                               ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
     // Run minor compaction
     testUtil.compact(txDataTable1, false);
+    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
     // No prune upper bound after minor compaction too
     Assert.assertEquals(-1,
                         dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
 
     // Run major compaction, and verify prune upper bound
     testUtil.compact(txDataTable1, true);
+    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
     Assert.assertEquals(50,
                         dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
 
     // Run major compaction again with same snapshot, prune upper bound should not change
     testUtil.compact(txDataTable1, true);
+    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
     Assert.assertEquals(50,
                         dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
 
@@ -185,6 +210,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
 
     // Run major compaction again, now prune upper bound should change
     testUtil.compact(txDataTable1, true);
+    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
     Assert.assertEquals(104,
                         dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
   }
@@ -202,6 +229,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
     // Run major compaction, and verify it completes
     long now = System.currentTimeMillis();
     testUtil.compact(txDataTable1, true);
+    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
     long lastMajorCompactionTime = TestTransactionProcessor.lastMajorCompactionTime.get();
     Assert.assertTrue(String.format("Expected %d, but was %d", now, lastMajorCompactionTime),
                       lastMajorCompactionTime >= now);
@@ -215,6 +244,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
     // Run major compaction, and verify it completes
     long now = System.currentTimeMillis();
     testUtil.compact(txDataTable1, true);
+    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
     long lastMajorCompactionTime = TestTransactionProcessor.lastMajorCompactionTime.get();
     Assert.assertTrue(String.format("Expected %d, but was %d", now, lastMajorCompactionTime),
                       lastMajorCompactionTime >= now);
@@ -232,6 +263,14 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
 
     TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
     transactionPruningPlugin.initialize(conf);
+
+    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
+    // Truncate prune state table to clear any data that might have been written by the previous test
+    // This is required because during the shutdown of the previous test, compaction might have kicked in and the
+    // coprocessor still had some data to flush and it might be flushed at the beginning of this test.
+    truncatePruneStateTable();
+
     try {
       // Run without a transaction snapshot first
       long now1 = 200;
@@ -276,6 +315,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
                           .add(getRegionName(txDataTable1, Bytes.toBytes(0)))
                           .build());
       testUtil.compact(txDataTable1, true);
+      // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+      TimeUnit.SECONDS.sleep(2);
       long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
       Assert.assertEquals(expectedPruneUpperBound2, pruneUpperBound2);
 

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 19eb09e..728adfa 100644
--- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -170,7 +170,9 @@ public class TransactionProcessor extends BaseRegionObserver {
 
   @Override
   public void stop(CoprocessorEnvironment e) throws IOException {
-    // nothing to do
+    if (compactionState != null) {
+      compactionState.stop();
+    }
   }
 
   @Override
@@ -191,7 +193,7 @@ public class TransactionProcessor extends BaseRegionObserver {
   public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability)
     throws IOException {
     Transaction tx = getFromOperation(put);
-    ensureValidTxLifetime(e.getEnvironment(), tx);
+    ensureValidTxLifetime(e.getEnvironment(), put, tx);
   }
 
   @Override
@@ -208,7 +210,7 @@ public class TransactionProcessor extends BaseRegionObserver {
     }
 
     Transaction tx = getFromOperation(delete);
-    ensureValidTxLifetime(e.getEnvironment(), tx);
+    ensureValidTxLifetime(e.getEnvironment(), delete, tx);
 
     // Other deletes are client-initiated and need to be translated into our own tombstones
     // TODO: this should delegate to the DeleteStrategy implementation.
@@ -322,11 +324,16 @@ public class TransactionProcessor extends BaseRegionObserver {
       if (conf != null) {
         pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
                                       TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
-        String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
-                                     TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
-        compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable));
-        LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " +
-                    pruneTable);
+        if (Boolean.TRUE.equals(pruneEnable)) {
+          String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+                                       TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
+          long pruneFlushInterval = TimeUnit.SECONDS.toMillis(
+            conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
+                         TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
+          compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
+          LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
+                      + pruneTable);
+        }
       }
     }
 
@@ -390,11 +397,13 @@ public class TransactionProcessor extends BaseRegionObserver {
    * Make sure that the transaction is within the max valid transaction lifetime.
    *
    * @param env {@link RegionCoprocessorEnvironment} of the Region to which the coprocessor is associated
+   * @param op {@link OperationWithAttributes} HBase operation to access its attributes if required
    * @param tx {@link Transaction} supplied by the
    * @throws DoNotRetryIOException thrown if the transaction is older than the max lifetime of a transaction
    *         IOException throw if the value of max lifetime of transaction is unavailable
    */
   protected void ensureValidTxLifetime(RegionCoprocessorEnvironment env,
+                                       @SuppressWarnings("unused") OperationWithAttributes op,
                                        @Nullable Transaction tx) throws IOException {
     if (tx == null) {
       return;

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
index 850f508..58596be 100644
--- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
@@ -38,22 +38,27 @@ import javax.annotation.Nullable;
 public class CompactionState {
   private static final Log LOG = LogFactory.getLog(CompactionState.class);
 
+  private final TableName stateTable;
   private final byte[] regionName;
   private final String regionNameAsString;
-  private final TableName stateTable;
   private final DataJanitorState dataJanitorState;
+  private final long pruneFlushInterval;
   private volatile long pruneUpperBound = -1;
 
-  public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable) {
+  private PruneUpperBoundWriter pruneUpperBoundWriter;
+
+  public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable, long pruneFlushInterval) {
+    this.stateTable = stateTable;
     this.regionName = env.getRegionInfo().getRegionName();
     this.regionNameAsString = env.getRegionInfo().getRegionNameAsString();
-    this.stateTable = stateTable;
     this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
       @Override
       public Table get() throws IOException {
         return env.getTable(stateTable);
       }
     });
+    this.pruneFlushInterval = pruneFlushInterval;
+    this.pruneUpperBoundWriter = createPruneUpperBoundWriter();
   }
 
   /**
@@ -75,18 +80,29 @@ public class CompactionState {
   }
 
   /**
+   * Stops the current {@link PruneUpperBoundWriter}.
+   */
+  public void stop() {
+    if (pruneUpperBoundWriter != null) {
+      pruneUpperBoundWriter.stop();
+    }
+  }
+
+  /**
    * Persists the transaction state recorded by {@link #record(CompactionRequest, TransactionVisibilityState)}.
    * This method is called after the compaction has successfully completed.
    */
   public void persist() {
     if (pruneUpperBound != -1) {
-      try {
-        dataJanitorState.savePruneUpperBoundForRegion(regionName, pruneUpperBound);
-        LOG.debug(String.format("Saved prune upper bound %s for region %s", pruneUpperBound, regionNameAsString));
-      } catch (IOException e) {
-        LOG.warn(String.format("Cannot record prune upper bound in table %s after compacting region %s",
-                               stateTable, regionNameAsString), e);
+      if (!pruneUpperBoundWriter.isAlive()) {
+        pruneUpperBoundWriter = createPruneUpperBoundWriter();
       }
+      pruneUpperBoundWriter.persistPruneEntry(pruneUpperBound);
+      LOG.debug(String.format("Enqueued prune upper bound %s for region %s", pruneUpperBound, regionNameAsString));
     }
   }
+
+  private PruneUpperBoundWriter createPruneUpperBoundWriter() {
+    return new PruneUpperBoundWriter(dataJanitorState, stateTable, regionNameAsString, regionName, pruneFlushInterval);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
index c6d03c4..51dc181 100644
--- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
@@ -46,8 +46,8 @@ import javax.annotation.Nullable;
 @SuppressWarnings("WeakerAccess")
 public class DataJanitorState {
   public static final byte[] FAMILY = {'f'};
+  public static final byte[] PRUNE_UPPER_BOUND_COL = {'p'};
 
-  private static final byte[] PRUNE_UPPER_BOUND_COL = {'p'};
   private static final byte[] REGION_TIME_COL = {'r'};
   private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_COL = {'i'};
 

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
index 83e3948..95216b9 100644
--- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
@@ -24,8 +24,10 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableExistsException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
@@ -123,6 +125,7 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
     final TableName stateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
                                                             TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
     LOG.info("Initializing plugin with state table {}", stateTable.getNameWithNamespaceInclAsString());
+    createPruneTable(stateTable);
     this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
       @Override
       public Table get() throws IOException {
@@ -209,6 +212,38 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
     }
   }
 
+  /**
+   * Create the prune state table given the {@link TableName} if the table doesn't exist already.
+   *
+   * @param stateTable prune state table name
+   */
+  protected void createPruneTable(TableName stateTable) throws IOException {
+    try (Admin admin = this.connection.getAdmin()) {
+      if (admin.tableExists(stateTable)) {
+        LOG.debug("Not creating pruneStateTable {} since it already exists.",
+                  stateTable.getNameWithNamespaceInclAsString());
+        return;
+      }
+
+      HTableDescriptor htd = new HTableDescriptor(stateTable);
+      htd.addFamily(new HColumnDescriptor(DataJanitorState.FAMILY).setMaxVersions(1));
+      admin.createTable(htd);
+      LOG.info("Created pruneTable {}", stateTable.getNameWithNamespaceInclAsString());
+    } catch (TableExistsException ex) {
+      // Expected if the prune state table is being created at the same time by another client
+      LOG.debug("Not creating pruneStateTable {} since it already exists.",
+                stateTable.getNameWithNamespaceInclAsString(), ex);
+    }
+  }
+
+  /**
+   * Returns whether the table is a transactional table. By default, it is a table is identified as a transactional
+   * table if it has a the coprocessor {@link TransactionProcessor} attached to it. Should be overriden if the users
+   * attach a different coprocessor.
+   *
+   * @param tableDescriptor {@link HTableDescriptor} of the table
+   * @return true if the table is transactional
+   */
   protected boolean isTransactionalTable(HTableDescriptor tableDescriptor) {
     return tableDescriptor.hasCoprocessor(TransactionProcessor.class.getName());
   }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
new file mode 100644
index 0000000..7bceaff
--- /dev/null
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Thread that will write the the prune upper bound
+ */
+public class PruneUpperBoundWriter {
+  private static final Log LOG = LogFactory.getLog(TransactionProcessor.class);
+
+  private final TableName pruneStateTable;
+  private final DataJanitorState dataJanitorState;
+  private final byte[] regionName;
+  private final String regionNameAsString;
+  private final long pruneFlushInterval;
+  private final AtomicLong pruneUpperBound;
+  private final AtomicBoolean shouldFlush;
+
+  private Thread flushThread;
+  private long lastChecked;
+
+  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, TableName pruneStateTable, String regionNameAsString,
+                               byte[] regionName, long pruneFlushInterval) {
+    this.pruneStateTable = pruneStateTable;
+    this.dataJanitorState = dataJanitorState;
+    this.regionName = regionName;
+    this.regionNameAsString = regionNameAsString;
+    this.pruneFlushInterval = pruneFlushInterval;
+    this.pruneUpperBound = new AtomicLong();
+    this.shouldFlush = new AtomicBoolean(false);
+    startFlushThread();
+  }
+
+  public boolean isAlive() {
+    return flushThread.isAlive();
+  }
+
+  public void persistPruneEntry(long pruneUpperBound) {
+    this.pruneUpperBound.set(pruneUpperBound);
+    this.shouldFlush.set(true);
+  }
+
+  public void stop() {
+    if (flushThread != null) {
+      flushThread.interrupt();
+    }
+  }
+
+  private void startFlushThread() {
+    flushThread = new Thread("tephra-prune-upper-bound-writer") {
+      @Override
+      public void run() {
+        while (!isInterrupted()) {
+          long now = System.currentTimeMillis();
+          if (now > (lastChecked + pruneFlushInterval)) {
+            if (shouldFlush.compareAndSet(true, false)) {
+              // should flush data
+              try {
+                dataJanitorState.savePruneUpperBoundForRegion(regionName, pruneUpperBound.get());
+              } catch (IOException ex) {
+                LOG.warn("Cannot record prune upper bound for region " + regionNameAsString + " in the table " +
+                           pruneStateTable.getNameWithNamespaceInclAsString() + " after compacting region.", ex);
+                // Retry again
+                shouldFlush.set(true);
+              }
+            }
+            lastChecked = now;
+          }
+
+          try {
+            TimeUnit.SECONDS.sleep(1);
+          } catch (InterruptedException ex) {
+            interrupt();
+            break;
+          }
+        }
+
+        LOG.info("PruneUpperBound Writer thread terminated.");
+      }
+    };
+
+    flushThread.setDaemon(true);
+    flushThread.start();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
index 310c710..a431ee3 100644
--- a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
+++ b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
@@ -59,6 +59,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -78,6 +79,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
     // Setup the configuration to start HBase cluster with the invalid list pruning enabled
     conf = HBaseConfiguration.create();
     conf.setBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, true);
+    // Flush prune data to table quickly, so that tests don't need have to wait long to see updates
+    conf.setLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL, 0L);
     AbstractHBaseTableTest.startMiniCluster();
 
     TransactionStateStorage txStateStorage = new InMemoryTransactionStateStorage();
@@ -135,6 +138,15 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
     }
   }
 
+  private void truncatePruneStateTable() throws Exception {
+    if (hBaseAdmin.tableExists(pruneStateTable)) {
+      if (hBaseAdmin.isTableEnabled(pruneStateTable)) {
+        hBaseAdmin.disableTable(pruneStateTable);
+      }
+      hBaseAdmin.truncateTable(pruneStateTable, true);
+    }
+  }
+
   @Test
   public void testRecordCompactionState() throws Exception {
     DataJanitorState dataJanitorState =
@@ -145,6 +157,13 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
         }
       });
 
+    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
+    // Truncate prune state table to clear any data that might have been written by the previous test
+    // This is required because during the shutdown of the previous test, compaction might have kicked in and the
+    // coprocessor still had some data to flush and it might be flushed at the beginning of this test.
+    truncatePruneStateTable();
+
     // No prune upper bound initially
     Assert.assertEquals(-1,
                         dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
@@ -155,17 +174,23 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
                               ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
     // Run minor compaction
     testUtil.compact(txDataTable1, false);
+    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
     // No prune upper bound after minor compaction too
     Assert.assertEquals(-1,
                         dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
 
     // Run major compaction, and verify prune upper bound
     testUtil.compact(txDataTable1, true);
+    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
     Assert.assertEquals(50,
                         dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
 
     // Run major compaction again with same snapshot, prune upper bound should not change
     testUtil.compact(txDataTable1, true);
+    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
     Assert.assertEquals(50,
                         dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
 
@@ -179,6 +204,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
 
     // Run major compaction again, now prune upper bound should change
     testUtil.compact(txDataTable1, true);
+    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
     Assert.assertEquals(104,
                         dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
   }
@@ -196,6 +223,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
     // Run major compaction, and verify it completes
     long now = System.currentTimeMillis();
     testUtil.compact(txDataTable1, true);
+    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
     long lastMajorCompactionTime = TestTransactionProcessor.lastMajorCompactionTime.get();
     Assert.assertTrue(String.format("Expected %d, but was %d", now, lastMajorCompactionTime),
                       lastMajorCompactionTime >= now);
@@ -209,6 +238,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
     // Run major compaction, and verify it completes
     long now = System.currentTimeMillis();
     testUtil.compact(txDataTable1, true);
+    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
     long lastMajorCompactionTime = TestTransactionProcessor.lastMajorCompactionTime.get();
     Assert.assertTrue(String.format("Expected %d, but was %d", now, lastMajorCompactionTime),
                       lastMajorCompactionTime >= now);
@@ -226,6 +257,14 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
 
     TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
     transactionPruningPlugin.initialize(conf);
+
+    // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
+    // Truncate prune state table to clear any data that might have been written by the previous test
+    // This is required because during the shutdown of the previous test, compaction might have kicked in and the
+    // coprocessor still had some data to flush and it might be flushed at the beginning of this test.
+    truncatePruneStateTable();
+
     try {
       // Run without a transaction snapshot first
       long now1 = 200;
@@ -270,6 +309,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
                           .add(getRegionName(txDataTable1, Bytes.toBytes(0)))
                           .build());
       testUtil.compact(txDataTable1, true);
+      // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
+      TimeUnit.SECONDS.sleep(2);
       long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
       Assert.assertEquals(expectedPruneUpperBound2, pruneUpperBound2);
 

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 19eb09e..728adfa 100644
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -170,7 +170,9 @@ public class TransactionProcessor extends BaseRegionObserver {
 
   @Override
   public void stop(CoprocessorEnvironment e) throws IOException {
-    // nothing to do
+    if (compactionState != null) {
+      compactionState.stop();
+    }
   }
 
   @Override
@@ -191,7 +193,7 @@ public class TransactionProcessor extends BaseRegionObserver {
   public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability)
     throws IOException {
     Transaction tx = getFromOperation(put);
-    ensureValidTxLifetime(e.getEnvironment(), tx);
+    ensureValidTxLifetime(e.getEnvironment(), put, tx);
   }
 
   @Override
@@ -208,7 +210,7 @@ public class TransactionProcessor extends BaseRegionObserver {
     }
 
     Transaction tx = getFromOperation(delete);
-    ensureValidTxLifetime(e.getEnvironment(), tx);
+    ensureValidTxLifetime(e.getEnvironment(), delete, tx);
 
     // Other deletes are client-initiated and need to be translated into our own tombstones
     // TODO: this should delegate to the DeleteStrategy implementation.
@@ -322,11 +324,16 @@ public class TransactionProcessor extends BaseRegionObserver {
       if (conf != null) {
         pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
                                       TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
-        String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
-                                     TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
-        compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable));
-        LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " +
-                    pruneTable);
+        if (Boolean.TRUE.equals(pruneEnable)) {
+          String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+                                       TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
+          long pruneFlushInterval = TimeUnit.SECONDS.toMillis(
+            conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
+                         TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
+          compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
+          LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
+                      + pruneTable);
+        }
       }
     }
 
@@ -390,11 +397,13 @@ public class TransactionProcessor extends BaseRegionObserver {
    * Make sure that the transaction is within the max valid transaction lifetime.
    *
    * @param env {@link RegionCoprocessorEnvironment} of the Region to which the coprocessor is associated
+   * @param op {@link OperationWithAttributes} HBase operation to access its attributes if required
    * @param tx {@link Transaction} supplied by the
    * @throws DoNotRetryIOException thrown if the transaction is older than the max lifetime of a transaction
    *         IOException throw if the value of max lifetime of transaction is unavailable
    */
   protected void ensureValidTxLifetime(RegionCoprocessorEnvironment env,
+                                       @SuppressWarnings("unused") OperationWithAttributes op,
                                        @Nullable Transaction tx) throws IOException {
     if (tx == null) {
       return;

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
index 850f508..58596be 100644
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
@@ -38,22 +38,27 @@ import javax.annotation.Nullable;
 public class CompactionState {
   private static final Log LOG = LogFactory.getLog(CompactionState.class);
 
+  private final TableName stateTable;
   private final byte[] regionName;
   private final String regionNameAsString;
-  private final TableName stateTable;
   private final DataJanitorState dataJanitorState;
+  private final long pruneFlushInterval;
   private volatile long pruneUpperBound = -1;
 
-  public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable) {
+  private PruneUpperBoundWriter pruneUpperBoundWriter;
+
+  public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable, long pruneFlushInterval) {
+    this.stateTable = stateTable;
     this.regionName = env.getRegionInfo().getRegionName();
     this.regionNameAsString = env.getRegionInfo().getRegionNameAsString();
-    this.stateTable = stateTable;
     this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
       @Override
       public Table get() throws IOException {
         return env.getTable(stateTable);
       }
     });
+    this.pruneFlushInterval = pruneFlushInterval;
+    this.pruneUpperBoundWriter = createPruneUpperBoundWriter();
   }
 
   /**
@@ -75,18 +80,29 @@ public class CompactionState {
   }
 
   /**
+   * Stops the current {@link PruneUpperBoundWriter}.
+   */
+  public void stop() {
+    if (pruneUpperBoundWriter != null) {
+      pruneUpperBoundWriter.stop();
+    }
+  }
+
+  /**
    * Persists the transaction state recorded by {@link #record(CompactionRequest, TransactionVisibilityState)}.
    * This method is called after the compaction has successfully completed.
    */
   public void persist() {
     if (pruneUpperBound != -1) {
-      try {
-        dataJanitorState.savePruneUpperBoundForRegion(regionName, pruneUpperBound);
-        LOG.debug(String.format("Saved prune upper bound %s for region %s", pruneUpperBound, regionNameAsString));
-      } catch (IOException e) {
-        LOG.warn(String.format("Cannot record prune upper bound in table %s after compacting region %s",
-                               stateTable, regionNameAsString), e);
+      if (!pruneUpperBoundWriter.isAlive()) {
+        pruneUpperBoundWriter = createPruneUpperBoundWriter();
       }
+      pruneUpperBoundWriter.persistPruneEntry(pruneUpperBound);
+      LOG.debug(String.format("Enqueued prune upper bound %s for region %s", pruneUpperBound, regionNameAsString));
     }
   }
+
+  private PruneUpperBoundWriter createPruneUpperBoundWriter() {
+    return new PruneUpperBoundWriter(dataJanitorState, stateTable, regionNameAsString, regionName, pruneFlushInterval);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
index c6d03c4..51dc181 100644
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
@@ -46,8 +46,8 @@ import javax.annotation.Nullable;
 @SuppressWarnings("WeakerAccess")
 public class DataJanitorState {
   public static final byte[] FAMILY = {'f'};
+  public static final byte[] PRUNE_UPPER_BOUND_COL = {'p'};
 
-  private static final byte[] PRUNE_UPPER_BOUND_COL = {'p'};
   private static final byte[] REGION_TIME_COL = {'r'};
   private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_COL = {'i'};
 

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
index 2e3b8d0..a63cf75 100644
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
@@ -24,8 +24,10 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableExistsException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
@@ -124,6 +126,7 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
                                                             TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
     LOG.info("Initializing plugin with state table {}:{}", stateTable.getNamespaceAsString(),
              stateTable.getNameAsString());
+    createPruneTable(stateTable);
     this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
       @Override
       public Table get() throws IOException {
@@ -210,6 +213,38 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
     }
   }
 
+  /**
+   * Create the prune state table given the {@link TableName} if the table doesn't exist already.
+   *
+   * @param stateTable prune state table name
+   */
+  protected void createPruneTable(TableName stateTable) throws IOException {
+    try (Admin admin = this.connection.getAdmin()) {
+      if (admin.tableExists(stateTable)) {
+        LOG.debug("Not creating pruneStateTable {}:{} since it already exists.",
+                  stateTable.getNamespaceAsString(), stateTable.getNameAsString());
+        return;
+      }
+
+      HTableDescriptor htd = new HTableDescriptor(stateTable);
+      htd.addFamily(new HColumnDescriptor(DataJanitorState.FAMILY).setMaxVersions(1));
+      admin.createTable(htd);
+      LOG.info("Created pruneTable {}:{}", stateTable.getNamespaceAsString(), stateTable.getNameAsString());
+    } catch (TableExistsException ex) {
+      // Expected if the prune state table is being created at the same time by another client
+      LOG.debug("Not creating pruneStateTable {}:{} since it already exists.",
+                stateTable.getNamespaceAsString(), stateTable.getNameAsString(), ex);
+    }
+  }
+
+  /**
+   * Returns whether the table is a transactional table. By default, it is a table is identified as a transactional
+   * table if it has a the coprocessor {@link TransactionProcessor} attached to it. Should be overriden if the users
+   * attach a different coprocessor.
+   *
+   * @param tableDescriptor {@link HTableDescriptor} of the table
+   * @return true if the table is transactional
+   */
   protected boolean isTransactionalTable(HTableDescriptor tableDescriptor) {
     return tableDescriptor.hasCoprocessor(TransactionProcessor.class.getName());
   }