You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by go...@apache.org on 2017/02/09 04:39:08 UTC

[1/2] incubator-tephra git commit: (TEPHRA-215) (TEPHRA-218) Use single thread across all regions in a region server to persist Prune Upper Bound info. Also don't refresh cache during startup of TransactionStateCache to avoid the possibility of Service s

Repository: incubator-tephra
Updated Branches:
  refs/heads/master aeeee00be -> 69a6cc6be


http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/69a6cc6b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 5a355e6..015077b 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -331,8 +331,10 @@ public class TransactionProcessor extends BaseRegionObserver {
             conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
                          TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
           compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
-          LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
-                      + pruneTable);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
+                        + pruneTable);
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/69a6cc6b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
index 58596be..db7880b 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
@@ -38,27 +38,25 @@ import javax.annotation.Nullable;
 public class CompactionState {
   private static final Log LOG = LogFactory.getLog(CompactionState.class);
 
-  private final TableName stateTable;
   private final byte[] regionName;
   private final String regionNameAsString;
-  private final DataJanitorState dataJanitorState;
-  private final long pruneFlushInterval;
-  private volatile long pruneUpperBound = -1;
+  private final PruneUpperBoundWriterSupplier pruneUpperBoundWriterSupplier;
+  private final PruneUpperBoundWriter pruneUpperBoundWriter;
 
-  private PruneUpperBoundWriter pruneUpperBoundWriter;
+  private volatile long pruneUpperBound = -1;
 
   public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable, long pruneFlushInterval) {
-    this.stateTable = stateTable;
     this.regionName = env.getRegionInfo().getRegionName();
     this.regionNameAsString = env.getRegionInfo().getRegionNameAsString();
-    this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
+    DataJanitorState dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
       @Override
       public Table get() throws IOException {
         return env.getTable(stateTable);
       }
     });
-    this.pruneFlushInterval = pruneFlushInterval;
-    this.pruneUpperBoundWriter = createPruneUpperBoundWriter();
+    this.pruneUpperBoundWriterSupplier = new PruneUpperBoundWriterSupplier(stateTable, dataJanitorState,
+                                                                           pruneFlushInterval);
+    this.pruneUpperBoundWriter = pruneUpperBoundWriterSupplier.get();
   }
 
   /**
@@ -71,38 +69,33 @@ public class CompactionState {
     if (request.isMajor() && snapshot != null) {
       Transaction tx = TxUtils.createDummyTransaction(snapshot);
       pruneUpperBound = TxUtils.getPruneUpperBound(tx);
-      LOG.debug(
-        String.format("Computed prune upper bound %s for compaction request %s using transaction state from time %s",
-                      pruneUpperBound, request, snapshot.getTimestamp()));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+          String.format("Computed prune upper bound %s for compaction request %s using transaction state from time %s",
+                        pruneUpperBound, request, snapshot.getTimestamp()));
+      }
     } else {
       pruneUpperBound = -1;
     }
   }
 
   /**
-   * Stops the current {@link PruneUpperBoundWriter}.
-   */
-  public void stop() {
-    if (pruneUpperBoundWriter != null) {
-      pruneUpperBoundWriter.stop();
-    }
-  }
-
-  /**
    * Persists the transaction state recorded by {@link #record(CompactionRequest, TransactionVisibilityState)}.
    * This method is called after the compaction has successfully completed.
    */
   public void persist() {
     if (pruneUpperBound != -1) {
-      if (!pruneUpperBoundWriter.isAlive()) {
-        pruneUpperBoundWriter = createPruneUpperBoundWriter();
+      pruneUpperBoundWriter.persistPruneEntry(regionName, pruneUpperBound);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format("Enqueued prune upper bound %s for region %s", pruneUpperBound, regionNameAsString));
       }
-      pruneUpperBoundWriter.persistPruneEntry(pruneUpperBound);
-      LOG.debug(String.format("Enqueued prune upper bound %s for region %s", pruneUpperBound, regionNameAsString));
     }
   }
 
-  private PruneUpperBoundWriter createPruneUpperBoundWriter() {
-    return new PruneUpperBoundWriter(dataJanitorState, stateTable, regionNameAsString, regionName, pruneFlushInterval);
+  /**
+   * Releases the usage {@link PruneUpperBoundWriter}.
+   */
+  public void stop() {
+    pruneUpperBoundWriterSupplier.release();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/69a6cc6b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
index 7bceaff..7e9d1a3 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -18,57 +18,62 @@
 
 package org.apache.tephra.hbase.txprune;
 
+import com.google.common.util.concurrent.AbstractIdleService;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.hadoop.hbase.util.Bytes;
 
 import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
- * Thread that will write the the prune upper bound
+ * Thread that will write the the prune upper bound. An instance of this class should be obtained only
+ * through {@link PruneUpperBoundWriterSupplier} which will also handle the lifecycle of this instance.
  */
-public class PruneUpperBoundWriter {
-  private static final Log LOG = LogFactory.getLog(TransactionProcessor.class);
+public class PruneUpperBoundWriter extends AbstractIdleService {
+  private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriter.class);
 
-  private final TableName pruneStateTable;
+  private final TableName tableName;
   private final DataJanitorState dataJanitorState;
-  private final byte[] regionName;
-  private final String regionNameAsString;
   private final long pruneFlushInterval;
-  private final AtomicLong pruneUpperBound;
-  private final AtomicBoolean shouldFlush;
+  private final ConcurrentSkipListMap<byte[], Long> pruneEntries;
+
+  private volatile Thread flushThread;
 
-  private Thread flushThread;
   private long lastChecked;
 
-  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, TableName pruneStateTable, String regionNameAsString,
-                               byte[] regionName, long pruneFlushInterval) {
-    this.pruneStateTable = pruneStateTable;
+  public PruneUpperBoundWriter(TableName tableName, DataJanitorState dataJanitorState, long pruneFlushInterval) {
+    this.tableName = tableName;
     this.dataJanitorState = dataJanitorState;
-    this.regionName = regionName;
-    this.regionNameAsString = regionNameAsString;
     this.pruneFlushInterval = pruneFlushInterval;
-    this.pruneUpperBound = new AtomicLong();
-    this.shouldFlush = new AtomicBoolean(false);
-    startFlushThread();
+    this.pruneEntries = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
+  }
+
+  public void persistPruneEntry(byte[] regionName, long pruneUpperBound) {
+    // The number of entries in this map is bound by the number of regions in this region server and thus it will not
+    // grow indefinitely
+    pruneEntries.put(regionName, pruneUpperBound);
   }
 
   public boolean isAlive() {
-    return flushThread.isAlive();
+    return flushThread != null && flushThread.isAlive();
   }
 
-  public void persistPruneEntry(long pruneUpperBound) {
-    this.pruneUpperBound.set(pruneUpperBound);
-    this.shouldFlush.set(true);
+  @Override
+  protected void startUp() throws Exception {
+    LOG.info("Starting PruneUpperBoundWriter Thread.");
+    startFlushThread();
   }
 
-  public void stop() {
+  @Override
+  protected void shutDown() throws Exception {
+    LOG.info("Stopping PruneUpperBoundWriter Thread.");
     if (flushThread != null) {
       flushThread.interrupt();
+      flushThread.join(TimeUnit.SECONDS.toMillis(1));
     }
   }
 
@@ -76,19 +81,21 @@ public class PruneUpperBoundWriter {
     flushThread = new Thread("tephra-prune-upper-bound-writer") {
       @Override
       public void run() {
-        while (!isInterrupted()) {
+        while ((!isInterrupted()) && isRunning()) {
           long now = System.currentTimeMillis();
           if (now > (lastChecked + pruneFlushInterval)) {
-            if (shouldFlush.compareAndSet(true, false)) {
-              // should flush data
-              try {
-                dataJanitorState.savePruneUpperBoundForRegion(regionName, pruneUpperBound.get());
-              } catch (IOException ex) {
-                LOG.warn("Cannot record prune upper bound for region " + regionNameAsString + " in the table " +
-                           pruneStateTable.getNameWithNamespaceInclAsString() + " after compacting region.", ex);
-                // Retry again
-                shouldFlush.set(true);
+            // should flush data
+            try {
+              while (pruneEntries.firstEntry() != null) {
+                Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
+                dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
+                // We can now remove the entry only if the key and value match with what we wrote since it is
+                // possible that a new pruneUpperBound for the same key has been added
+                pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
               }
+            } catch (IOException ex) {
+              LOG.warn("Cannot record prune upper bound for a region to table " +
+                         tableName.getNameWithNamespaceInclAsString(), ex);
             }
             lastChecked = now;
           }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/69a6cc6b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
new file mode 100644
index 0000000..98f3334
--- /dev/null
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+
+import com.google.common.base.Supplier;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
+
+/**
+ * Supplies instances of {@link PruneUpperBoundWriter} implementations.
+ */
+public class PruneUpperBoundWriterSupplier implements Supplier<PruneUpperBoundWriter> {
+  private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriterSupplier.class);
+
+  private static volatile PruneUpperBoundWriter instance;
+  private static volatile int refCount = 0;
+  private static final Object lock = new Object();
+
+  private final TableName tableName;
+  private final DataJanitorState dataJanitorState;
+  private final long pruneFlushInterval;
+
+  public PruneUpperBoundWriterSupplier(TableName tableName, DataJanitorState dataJanitorState,
+                                       long pruneFlushInterval) {
+    this.tableName = tableName;
+    this.dataJanitorState = dataJanitorState;
+    this.pruneFlushInterval = pruneFlushInterval;
+  }
+
+  @Override
+  public PruneUpperBoundWriter get() {
+    synchronized (lock) {
+      if (instance == null) {
+        instance = new PruneUpperBoundWriter(tableName, dataJanitorState, pruneFlushInterval);
+        instance.startAndWait();
+      }
+      refCount++;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Incrementing Reference Count for PruneUpperBoundWriter " + refCount);
+      }
+      return instance;
+    }
+  }
+
+  public void release() {
+    synchronized (lock) {
+      refCount--;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Decrementing Reference Count for PruneUpperBoundWriter " + refCount);
+      }
+
+      if (refCount == 0) {
+        try {
+          instance.stopAndWait();
+        } catch (Exception ex) {
+          LOG.warn("Exception while trying to shutdown PruneUpperBoundWriter thread. ", ex);
+        } finally {
+          // If the thread is still alive (might happen if the thread was blocked on HBase PUT call), interrupt it again
+          if (instance.isAlive()) {
+            try {
+              instance.shutDown();
+            } catch (Exception e) {
+              LOG.warn("Exception while trying to shutdown PruneUpperBoundWriter thread. ", e);
+            }
+          }
+          instance = null;
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/69a6cc6b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java
new file mode 100644
index 0000000..08d3b49
--- /dev/null
+++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Tests to verify the behavior of {@link PruneUpperBoundWriterSupplier}.
+ */
+public class PruneUpperBoundWriterSupplierTest {
+  private static final Logger LOG = LoggerFactory.getLogger(PruneUpperBoundWriterSupplierTest.class);
+  private static final int NUM_OPS = 10000;
+  private static final int NUM_THREADS = 50;
+
+  @Test
+  public void testSupplier() throws Exception {
+    final PruneUpperBoundWriterSupplier supplier = new PruneUpperBoundWriterSupplier(null, null, 10L);
+    // Get one instance now, for later comparisons
+    final PruneUpperBoundWriter writer = supplier.get();
+    final AtomicInteger numOps = new AtomicInteger(NUM_OPS);
+    final Random random = new Random(System.currentTimeMillis());
+
+    // Start threads that will 'get' PruneUpperBoundWriters
+    ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
+    List<Future> futureList = new ArrayList<>();
+    for (int i = 0; i < NUM_THREADS; i++) {
+      futureList.add(executor.submit(new Runnable() {
+
+        @Override
+        public void run() {
+          // Perform NUM_OPS 'gets' of PruneUpperBoundWriter
+          while (numOps.decrementAndGet() > 0) {
+            PruneUpperBoundWriter newWriter = supplier.get();
+            Assert.assertTrue(newWriter == writer);
+            int waitTime = random.nextInt(10);
+            try {
+              TimeUnit.MICROSECONDS.sleep(waitTime);
+            } catch (InterruptedException e) {
+              LOG.warn("Received an exception.", e);
+            }
+          }
+        }
+      }));
+    }
+
+    for (Future future : futureList) {
+      future.get(5, TimeUnit.SECONDS);
+    }
+    executor.shutdown();
+    executor.awaitTermination(2, TimeUnit.SECONDS);
+
+    futureList.clear();
+    numOps.set(NUM_OPS);
+    // Start thread that release PruneUpperBoundWriters
+    executor = Executors.newFixedThreadPool(NUM_THREADS);
+    for (int i = 0; i < NUM_THREADS; i++) {
+      futureList.add(executor.submit(new Runnable() {
+
+        @Override
+        public void run() {
+          // We need to release all NUM_OPS 'gets' that were executed to trigger shutdown of the single instance of
+          // PruneUpperBoundWriter
+          while (numOps.decrementAndGet() > 0) {
+            supplier.release();
+            try {
+              TimeUnit.MICROSECONDS.sleep(random.nextInt(10));
+            } catch (InterruptedException e) {
+              LOG.warn("Received an exception.", e);
+            }
+          }
+        }
+      }));
+    }
+
+    for (Future future : futureList) {
+      future.get(1, TimeUnit.SECONDS);
+    }
+
+    executor.shutdown();
+    executor.awaitTermination(2, TimeUnit.SECONDS);
+
+    // Verify that the PruneUpperBoundWriter is still running and the pruneThread is still alive.
+    Assert.assertTrue(writer.isRunning());
+    Assert.assertTrue(writer.isAlive());
+
+    // Since we got one instance in the beginning, we need to release it
+    supplier.release();
+
+    // Verify that the PruneUpperBoundWriter is shutdown and the pruneThread is not alive anymore.
+    Assert.assertFalse(writer.isRunning());
+    Assert.assertFalse(writer.isAlive());
+  }
+}


[2/2] incubator-tephra git commit: (TEPHRA-215) (TEPHRA-218) Use single thread across all regions in a region server to persist Prune Upper Bound info. Also don't refresh cache during startup of TransactionStateCache to avoid the possibility of Service s

Posted by go...@apache.org.
(TEPHRA-215) (TEPHRA-218) Use single thread across all regions in a region server to persist Prune Upper Bound info. Also don't refresh cache during startup of TransactionStateCache to avoid the possibility of Service stopping if tx.snapshot dir is not found during startup

This closes #32 from GitHub.

Signed-off-by: Gokul Gunasekaran <go...@cask.co>


Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/69a6cc6b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/69a6cc6b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/69a6cc6b

Branch: refs/heads/master
Commit: 69a6cc6be4fb8ae47fecda71d28d1900109ac866
Parents: aeeee00
Author: Gokul Gunasekaran <go...@cask.co>
Authored: Tue Feb 7 00:17:14 2017 -0800
Committer: Gokul Gunasekaran <go...@cask.co>
Committed: Wed Feb 8 20:36:10 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/tephra/TxConstants.java     |   2 +-
 .../coprocessor/TransactionStateCache.java      |  12 +-
 .../hbase/coprocessor/TransactionProcessor.java |   6 +-
 .../tephra/hbase/txprune/CompactionState.java   |  47 +++----
 .../hbase/txprune/PruneUpperBoundWriter.java    |  78 ++++++------
 .../txprune/PruneUpperBoundWriterSupplier.java  |  89 ++++++++++++++
 .../PruneUpperBoundWriterSupplierTest.java      | 122 +++++++++++++++++++
 .../hbase/coprocessor/TransactionProcessor.java |   6 +-
 .../tephra/hbase/txprune/CompactionState.java   |  51 ++++----
 .../hbase/txprune/PruneUpperBoundWriter.java    |  78 ++++++------
 .../txprune/PruneUpperBoundWriterSupplier.java  |  89 ++++++++++++++
 .../PruneUpperBoundWriterSupplierTest.java      | 122 +++++++++++++++++++
 .../hbase/coprocessor/TransactionProcessor.java |   6 +-
 .../tephra/hbase/txprune/CompactionState.java   |  47 +++----
 .../hbase/txprune/PruneUpperBoundWriter.java    |  77 ++++++------
 .../txprune/PruneUpperBoundWriterSupplier.java  |  89 ++++++++++++++
 .../PruneUpperBoundWriterSupplierTest.java      | 122 +++++++++++++++++++
 .../hbase/coprocessor/TransactionProcessor.java |   6 +-
 .../tephra/hbase/txprune/CompactionState.java   |  47 +++----
 .../hbase/txprune/PruneUpperBoundWriter.java    |  78 ++++++------
 .../txprune/PruneUpperBoundWriterSupplier.java  |  89 ++++++++++++++
 .../PruneUpperBoundWriterSupplierTest.java      | 122 +++++++++++++++++++
 .../hbase/coprocessor/TransactionProcessor.java |   6 +-
 .../tephra/hbase/txprune/CompactionState.java   |  47 +++----
 .../hbase/txprune/PruneUpperBoundWriter.java    |  77 ++++++------
 .../txprune/PruneUpperBoundWriterSupplier.java  |  89 ++++++++++++++
 .../PruneUpperBoundWriterSupplierTest.java      | 122 +++++++++++++++++++
 27 files changed, 1397 insertions(+), 329 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/69a6cc6b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
index 1988abf..ebf91e3 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
@@ -385,7 +385,7 @@ public class TxConstants {
     public static final String PLUGIN_CLASS_SUFFIX = ".class";
 
     public static final boolean DEFAULT_PRUNE_ENABLE = false;
-    public static final String DEFAULT_PRUNE_STATE_TABLE = "data_tx_janitor_state";
+    public static final String DEFAULT_PRUNE_STATE_TABLE = "tephra.state";
     public static final long DEFAULT_PRUNE_INTERVAL = TimeUnit.HOURS.toSeconds(6);
     public static final long DEFAULT_PRUNE_FLUSH_INTERVAL = TimeUnit.MINUTES.toSeconds(1);
     public static final String DEFAULT_PLUGIN = "data.tx.prune.plugin.default";

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/69a6cc6b/tephra-core/src/main/java/org/apache/tephra/coprocessor/TransactionStateCache.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/coprocessor/TransactionStateCache.java b/tephra-core/src/main/java/org/apache/tephra/coprocessor/TransactionStateCache.java
index f1d62b8..5869685 100644
--- a/tephra-core/src/main/java/org/apache/tephra/coprocessor/TransactionStateCache.java
+++ b/tephra-core/src/main/java/org/apache/tephra/coprocessor/TransactionStateCache.java
@@ -33,6 +33,7 @@ import org.apache.tephra.util.ConfigurationFactory;
 
 import java.io.IOException;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
 
 /**
  * Periodically refreshes transaction state from the latest stored snapshot.  This is implemented as a singleton
@@ -70,7 +71,11 @@ public class TransactionStateCache extends AbstractIdleService implements Config
 
   @Override
   protected void startUp() throws Exception {
-    refreshState();
+    try {
+      refreshState();
+    } catch (IOException ioe) {
+      LOG.info("Error refreshing transaction state cache.", ioe);
+    }
     startRefreshService();
   }
 
@@ -100,7 +105,7 @@ public class TransactionStateCache extends AbstractIdleService implements Config
         LOG.info("Could not load configuration");
       }
     } catch (Exception e) {
-      LOG.info("Failed to initialize TransactionStateCache due to: " + e.getMessage());
+      LOG.info("Failed to initialize TransactionStateCache due to: ", e);
     }
   }
 
@@ -125,7 +130,7 @@ public class TransactionStateCache extends AbstractIdleService implements Config
             try {
               refreshState();
             } catch (IOException ioe) {
-              LOG.info("Error refreshing transaction state cache: " + ioe.getMessage());
+              LOG.info("Error refreshing transaction state cache.", ioe);
             }
           }
           try {
@@ -170,6 +175,7 @@ public class TransactionStateCache extends AbstractIdleService implements Config
     }
   }
 
+  @Nullable
   public TransactionVisibilityState getLatestState() {
     return latestState;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/69a6cc6b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 3d1c7f1..53a8957 100644
--- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -331,8 +331,10 @@ public class TransactionProcessor extends BaseRegionObserver {
             conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
                          TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
           compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
-          LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
-                      + pruneTable);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
+                        + pruneTable);
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/69a6cc6b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
index f4f1d43..c1f1825 100644
--- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
@@ -38,27 +38,25 @@ import javax.annotation.Nullable;
 public class CompactionState {
   private static final Log LOG = LogFactory.getLog(CompactionState.class);
 
-  private final TableName stateTable;
   private final byte[] regionName;
   private final String regionNameAsString;
-  private final DataJanitorState dataJanitorState;
-  private final long pruneFlushInterval;
-  private volatile long pruneUpperBound = -1;
+  private final PruneUpperBoundWriterSupplier pruneUpperBoundWriterSupplier;
+  private final PruneUpperBoundWriter pruneUpperBoundWriter;
 
-  private PruneUpperBoundWriter pruneUpperBoundWriter;
+  private volatile long pruneUpperBound = -1;
 
   public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable, long pruneFlushInterval) {
-    this.stateTable = stateTable;
     this.regionName = env.getRegion().getRegionName();
     this.regionNameAsString = env.getRegion().getRegionNameAsString();
-    this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
+    DataJanitorState dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
       @Override
       public HTableInterface get() throws IOException {
         return env.getTable(stateTable);
       }
     });
-    this.pruneFlushInterval = pruneFlushInterval;
-    this.pruneUpperBoundWriter = createPruneUpperBoundWriter();
+    this.pruneUpperBoundWriterSupplier = new PruneUpperBoundWriterSupplier(stateTable, dataJanitorState,
+                                                                           pruneFlushInterval);
+    this.pruneUpperBoundWriter = pruneUpperBoundWriterSupplier.get();
   }
 
   /**
@@ -71,38 +69,33 @@ public class CompactionState {
     if (request.isMajor() && snapshot != null) {
       Transaction tx = TxUtils.createDummyTransaction(snapshot);
       pruneUpperBound = TxUtils.getPruneUpperBound(tx);
-      LOG.debug(
-        String.format("Computed prune upper bound %s for compaction request %s using transaction state from time %s",
-                      pruneUpperBound, request, snapshot.getTimestamp()));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+          String.format("Computed prune upper bound %s for compaction request %s using transaction state from time %s",
+                        pruneUpperBound, request, snapshot.getTimestamp()));
+      }
     } else {
       pruneUpperBound = -1;
     }
   }
 
   /**
-   * Stops the current {@link PruneUpperBoundWriter}.
-   */
-  public void stop() {
-    if (pruneUpperBoundWriter != null) {
-      pruneUpperBoundWriter.stop();
-    }
-  }
-
-  /**
    * Persists the transaction state recorded by {@link #record(CompactionRequest, TransactionVisibilityState)}.
    * This method is called after the compaction has successfully completed.
    */
   public void persist() {
     if (pruneUpperBound != -1) {
-      if (!pruneUpperBoundWriter.isAlive()) {
-        pruneUpperBoundWriter = createPruneUpperBoundWriter();
+      pruneUpperBoundWriter.persistPruneEntry(regionName, pruneUpperBound);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format("Enqueued prune upper bound %s for region %s", pruneUpperBound, regionNameAsString));
       }
-      pruneUpperBoundWriter.persistPruneEntry(pruneUpperBound);
-      LOG.debug(String.format("Enqueued prune upper bound %s for region %s", pruneUpperBound, regionNameAsString));
     }
   }
 
-  private PruneUpperBoundWriter createPruneUpperBoundWriter() {
-    return new PruneUpperBoundWriter(dataJanitorState, stateTable, regionNameAsString, regionName, pruneFlushInterval);
+  /**
+   * Releases the usage {@link PruneUpperBoundWriter}.
+   */
+  public void stop() {
+    pruneUpperBoundWriterSupplier.release();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/69a6cc6b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
index c981e15..5a86b4a 100644
--- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -18,57 +18,62 @@
 
 package org.apache.tephra.hbase.txprune;
 
+import com.google.common.util.concurrent.AbstractIdleService;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.hadoop.hbase.util.Bytes;
 
 import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
- * Thread that will write the the prune upper bound
+ * Thread that will write the the prune upper bound. An instance of this class should be obtained only
+ * through {@link PruneUpperBoundWriterSupplier} which will also handle the lifecycle of this instance.
  */
-public class PruneUpperBoundWriter {
-  private static final Log LOG = LogFactory.getLog(TransactionProcessor.class);
+public class PruneUpperBoundWriter extends AbstractIdleService {
+  private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriter.class);
 
-  private final TableName pruneStateTable;
+  private final TableName tableName;
   private final DataJanitorState dataJanitorState;
-  private final byte[] regionName;
-  private final String regionNameAsString;
   private final long pruneFlushInterval;
-  private final AtomicLong pruneUpperBound;
-  private final AtomicBoolean shouldFlush;
+  private final ConcurrentSkipListMap<byte[], Long> pruneEntries;
+
+  private volatile Thread flushThread;
 
-  private Thread flushThread;
   private long lastChecked;
 
-  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, TableName pruneStateTable, String regionNameAsString,
-                               byte[] regionName, long pruneFlushInterval) {
-    this.pruneStateTable = pruneStateTable;
+  public PruneUpperBoundWriter(TableName tableName, DataJanitorState dataJanitorState, long pruneFlushInterval) {
+    this.tableName = tableName;
     this.dataJanitorState = dataJanitorState;
-    this.regionName = regionName;
-    this.regionNameAsString = regionNameAsString;
     this.pruneFlushInterval = pruneFlushInterval;
-    this.pruneUpperBound = new AtomicLong();
-    this.shouldFlush = new AtomicBoolean(false);
-    startFlushThread();
+    this.pruneEntries = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
+  }
+
+  public void persistPruneEntry(byte[] regionName, long pruneUpperBound) {
+    // The number of entries in this map is bound by the number of regions in this region server and thus it will not
+    // grow indefinitely
+    pruneEntries.put(regionName, pruneUpperBound);
   }
 
   public boolean isAlive() {
-    return flushThread.isAlive();
+    return flushThread != null && flushThread.isAlive();
   }
 
-  public void persistPruneEntry(long pruneUpperBound) {
-    this.pruneUpperBound.set(pruneUpperBound);
-    this.shouldFlush.set(true);
+  @Override
+  protected void startUp() throws Exception {
+    LOG.info("Starting PruneUpperBoundWriter Thread.");
+    startFlushThread();
   }
 
-  public void stop() {
+  @Override
+  protected void shutDown() throws Exception {
+    LOG.info("Stopping PruneUpperBoundWriter Thread.");
     if (flushThread != null) {
       flushThread.interrupt();
+      flushThread.join(TimeUnit.SECONDS.toMillis(1));
     }
   }
 
@@ -76,20 +81,21 @@ public class PruneUpperBoundWriter {
     flushThread = new Thread("tephra-prune-upper-bound-writer") {
       @Override
       public void run() {
-        while (!isInterrupted()) {
+        while ((!isInterrupted()) && isRunning()) {
           long now = System.currentTimeMillis();
           if (now > (lastChecked + pruneFlushInterval)) {
-            if (shouldFlush.compareAndSet(true, false)) {
-              // should flush data
-              try {
-                dataJanitorState.savePruneUpperBoundForRegion(regionName, pruneUpperBound.get());
-              } catch (IOException ex) {
-                LOG.warn("Cannot record prune upper bound for region " + regionNameAsString + " in the table " +
-                           pruneStateTable.getNamespaceAsString() + ":" + pruneStateTable.getNameAsString() +
-                           " after compacting region.", ex);
-                // Retry again
-                shouldFlush.set(true);
+            // should flush data
+            try {
+              while (pruneEntries.firstEntry() != null) {
+                Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
+                dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
+                // We can now remove the entry only if the key and value match with what we wrote since it is
+                // possible that a new pruneUpperBound for the same key has been added
+                pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
               }
+            } catch (IOException ex) {
+              LOG.warn("Cannot record prune upper bound for a region to table " +
+                         tableName.getNamespaceAsString() + ":" + tableName.getNameAsString(), ex);
             }
             lastChecked = now;
           }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/69a6cc6b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
new file mode 100644
index 0000000..98f3334
--- /dev/null
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+
+import com.google.common.base.Supplier;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
+
+/**
+ * Supplies instances of {@link PruneUpperBoundWriter} implementations.
+ */
+public class PruneUpperBoundWriterSupplier implements Supplier<PruneUpperBoundWriter> {
+  private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriterSupplier.class);
+
+  private static volatile PruneUpperBoundWriter instance;
+  private static volatile int refCount = 0;
+  private static final Object lock = new Object();
+
+  private final TableName tableName;
+  private final DataJanitorState dataJanitorState;
+  private final long pruneFlushInterval;
+
+  public PruneUpperBoundWriterSupplier(TableName tableName, DataJanitorState dataJanitorState,
+                                       long pruneFlushInterval) {
+    this.tableName = tableName;
+    this.dataJanitorState = dataJanitorState;
+    this.pruneFlushInterval = pruneFlushInterval;
+  }
+
+  @Override
+  public PruneUpperBoundWriter get() {
+    synchronized (lock) {
+      if (instance == null) {
+        instance = new PruneUpperBoundWriter(tableName, dataJanitorState, pruneFlushInterval);
+        instance.startAndWait();
+      }
+      refCount++;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Incrementing Reference Count for PruneUpperBoundWriter " + refCount);
+      }
+      return instance;
+    }
+  }
+
+  public void release() {
+    synchronized (lock) {
+      refCount--;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Decrementing Reference Count for PruneUpperBoundWriter " + refCount);
+      }
+
+      if (refCount == 0) {
+        try {
+          instance.stopAndWait();
+        } catch (Exception ex) {
+          LOG.warn("Exception while trying to shutdown PruneUpperBoundWriter thread. ", ex);
+        } finally {
+          // If the thread is still alive (might happen if the thread was blocked on HBase PUT call), interrupt it again
+          if (instance.isAlive()) {
+            try {
+              instance.shutDown();
+            } catch (Exception e) {
+              LOG.warn("Exception while trying to shutdown PruneUpperBoundWriter thread. ", e);
+            }
+          }
+          instance = null;
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/69a6cc6b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java
new file mode 100644
index 0000000..08d3b49
--- /dev/null
+++ b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Tests to verify the behavior of {@link PruneUpperBoundWriterSupplier}.
+ */
+public class PruneUpperBoundWriterSupplierTest {
+  private static final Logger LOG = LoggerFactory.getLogger(PruneUpperBoundWriterSupplierTest.class);
+  private static final int NUM_OPS = 10000;
+  private static final int NUM_THREADS = 50;
+
+  @Test
+  public void testSupplier() throws Exception {
+    final PruneUpperBoundWriterSupplier supplier = new PruneUpperBoundWriterSupplier(null, null, 10L);
+    // Get one instance now, for later comparisons
+    final PruneUpperBoundWriter writer = supplier.get();
+    final AtomicInteger numOps = new AtomicInteger(NUM_OPS);
+    final Random random = new Random(System.currentTimeMillis());
+
+    // Start threads that will 'get' PruneUpperBoundWriters
+    ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
+    List<Future> futureList = new ArrayList<>();
+    for (int i = 0; i < NUM_THREADS; i++) {
+      futureList.add(executor.submit(new Runnable() {
+
+        @Override
+        public void run() {
+          // Perform NUM_OPS 'gets' of PruneUpperBoundWriter
+          while (numOps.decrementAndGet() > 0) {
+            PruneUpperBoundWriter newWriter = supplier.get();
+            Assert.assertTrue(newWriter == writer);
+            int waitTime = random.nextInt(10);
+            try {
+              TimeUnit.MICROSECONDS.sleep(waitTime);
+            } catch (InterruptedException e) {
+              LOG.warn("Received an exception.", e);
+            }
+          }
+        }
+      }));
+    }
+
+    for (Future future : futureList) {
+      future.get(5, TimeUnit.SECONDS);
+    }
+    executor.shutdown();
+    executor.awaitTermination(2, TimeUnit.SECONDS);
+
+    futureList.clear();
+    numOps.set(NUM_OPS);
+    // Start thread that release PruneUpperBoundWriters
+    executor = Executors.newFixedThreadPool(NUM_THREADS);
+    for (int i = 0; i < NUM_THREADS; i++) {
+      futureList.add(executor.submit(new Runnable() {
+
+        @Override
+        public void run() {
+          // We need to release all NUM_OPS 'gets' that were executed to trigger shutdown of the single instance of
+          // PruneUpperBoundWriter
+          while (numOps.decrementAndGet() > 0) {
+            supplier.release();
+            try {
+              TimeUnit.MICROSECONDS.sleep(random.nextInt(10));
+            } catch (InterruptedException e) {
+              LOG.warn("Received an exception.", e);
+            }
+          }
+        }
+      }));
+    }
+
+    for (Future future : futureList) {
+      future.get(1, TimeUnit.SECONDS);
+    }
+
+    executor.shutdown();
+    executor.awaitTermination(2, TimeUnit.SECONDS);
+
+    // Verify that the PruneUpperBoundWriter is still running and the pruneThread is still alive.
+    Assert.assertTrue(writer.isRunning());
+    Assert.assertTrue(writer.isAlive());
+
+    // Since we got one instance in the beginning, we need to release it
+    supplier.release();
+
+    // Verify that the PruneUpperBoundWriter is shutdown and the pruneThread is not alive anymore.
+    Assert.assertFalse(writer.isRunning());
+    Assert.assertFalse(writer.isAlive());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/69a6cc6b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 728adfa..9e9dd46 100644
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -331,8 +331,10 @@ public class TransactionProcessor extends BaseRegionObserver {
             conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
                          TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
           compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
-          LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
-                      + pruneTable);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
+                        + pruneTable);
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/69a6cc6b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
index f4f1d43..a1d1e35 100644
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
@@ -38,27 +38,25 @@ import javax.annotation.Nullable;
 public class CompactionState {
   private static final Log LOG = LogFactory.getLog(CompactionState.class);
 
-  private final TableName stateTable;
   private final byte[] regionName;
   private final String regionNameAsString;
-  private final DataJanitorState dataJanitorState;
-  private final long pruneFlushInterval;
-  private volatile long pruneUpperBound = -1;
+  private final PruneUpperBoundWriterSupplier pruneUpperBoundWriterSupplier;
+  private final PruneUpperBoundWriter pruneUpperBoundWriter;
 
-  private PruneUpperBoundWriter pruneUpperBoundWriter;
+  private volatile long pruneUpperBound = -1;
 
   public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable, long pruneFlushInterval) {
-    this.stateTable = stateTable;
-    this.regionName = env.getRegion().getRegionName();
-    this.regionNameAsString = env.getRegion().getRegionNameAsString();
-    this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
+    this.regionName = env.getRegionInfo().getRegionName();
+    this.regionNameAsString = env.getRegionInfo().getRegionNameAsString();
+    DataJanitorState dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
       @Override
       public HTableInterface get() throws IOException {
         return env.getTable(stateTable);
       }
     });
-    this.pruneFlushInterval = pruneFlushInterval;
-    this.pruneUpperBoundWriter = createPruneUpperBoundWriter();
+    this.pruneUpperBoundWriterSupplier = new PruneUpperBoundWriterSupplier(stateTable, dataJanitorState,
+                                                                           pruneFlushInterval);
+    this.pruneUpperBoundWriter = pruneUpperBoundWriterSupplier.get();
   }
 
   /**
@@ -71,38 +69,33 @@ public class CompactionState {
     if (request.isMajor() && snapshot != null) {
       Transaction tx = TxUtils.createDummyTransaction(snapshot);
       pruneUpperBound = TxUtils.getPruneUpperBound(tx);
-      LOG.debug(
-        String.format("Computed prune upper bound %s for compaction request %s using transaction state from time %s",
-                      pruneUpperBound, request, snapshot.getTimestamp()));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+          String.format("Computed prune upper bound %s for compaction request %s using transaction state from time %s",
+                        pruneUpperBound, request, snapshot.getTimestamp()));
+      }
     } else {
       pruneUpperBound = -1;
     }
   }
 
   /**
-   * Stops the current {@link PruneUpperBoundWriter}.
-   */
-  public void stop() {
-    if (pruneUpperBoundWriter != null) {
-      pruneUpperBoundWriter.stop();
-    }
-  }
-
-  /**
    * Persists the transaction state recorded by {@link #record(CompactionRequest, TransactionVisibilityState)}.
    * This method is called after the compaction has successfully completed.
    */
   public void persist() {
     if (pruneUpperBound != -1) {
-      if (!pruneUpperBoundWriter.isAlive()) {
-        pruneUpperBoundWriter = createPruneUpperBoundWriter();
+      pruneUpperBoundWriter.persistPruneEntry(regionName, pruneUpperBound);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format("Enqueued prune upper bound %s for region %s", pruneUpperBound, regionNameAsString));
       }
-      pruneUpperBoundWriter.persistPruneEntry(pruneUpperBound);
-      LOG.debug(String.format("Enqueued prune upper bound %s for region %s", pruneUpperBound, regionNameAsString));
     }
   }
 
-  private PruneUpperBoundWriter createPruneUpperBoundWriter() {
-    return new PruneUpperBoundWriter(dataJanitorState, stateTable, regionNameAsString, regionName, pruneFlushInterval);
+  /**
+   * Releases the usage {@link PruneUpperBoundWriter}.
+   */
+  public void stop() {
+    pruneUpperBoundWriterSupplier.release();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/69a6cc6b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
index c981e15..5a86b4a 100644
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -18,57 +18,62 @@
 
 package org.apache.tephra.hbase.txprune;
 
+import com.google.common.util.concurrent.AbstractIdleService;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.hadoop.hbase.util.Bytes;
 
 import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
- * Thread that will write the the prune upper bound
+ * Thread that will write the the prune upper bound. An instance of this class should be obtained only
+ * through {@link PruneUpperBoundWriterSupplier} which will also handle the lifecycle of this instance.
  */
-public class PruneUpperBoundWriter {
-  private static final Log LOG = LogFactory.getLog(TransactionProcessor.class);
+public class PruneUpperBoundWriter extends AbstractIdleService {
+  private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriter.class);
 
-  private final TableName pruneStateTable;
+  private final TableName tableName;
   private final DataJanitorState dataJanitorState;
-  private final byte[] regionName;
-  private final String regionNameAsString;
   private final long pruneFlushInterval;
-  private final AtomicLong pruneUpperBound;
-  private final AtomicBoolean shouldFlush;
+  private final ConcurrentSkipListMap<byte[], Long> pruneEntries;
+
+  private volatile Thread flushThread;
 
-  private Thread flushThread;
   private long lastChecked;
 
-  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, TableName pruneStateTable, String regionNameAsString,
-                               byte[] regionName, long pruneFlushInterval) {
-    this.pruneStateTable = pruneStateTable;
+  public PruneUpperBoundWriter(TableName tableName, DataJanitorState dataJanitorState, long pruneFlushInterval) {
+    this.tableName = tableName;
     this.dataJanitorState = dataJanitorState;
-    this.regionName = regionName;
-    this.regionNameAsString = regionNameAsString;
     this.pruneFlushInterval = pruneFlushInterval;
-    this.pruneUpperBound = new AtomicLong();
-    this.shouldFlush = new AtomicBoolean(false);
-    startFlushThread();
+    this.pruneEntries = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
+  }
+
+  public void persistPruneEntry(byte[] regionName, long pruneUpperBound) {
+    // The number of entries in this map is bound by the number of regions in this region server and thus it will not
+    // grow indefinitely
+    pruneEntries.put(regionName, pruneUpperBound);
   }
 
   public boolean isAlive() {
-    return flushThread.isAlive();
+    return flushThread != null && flushThread.isAlive();
   }
 
-  public void persistPruneEntry(long pruneUpperBound) {
-    this.pruneUpperBound.set(pruneUpperBound);
-    this.shouldFlush.set(true);
+  @Override
+  protected void startUp() throws Exception {
+    LOG.info("Starting PruneUpperBoundWriter Thread.");
+    startFlushThread();
   }
 
-  public void stop() {
+  @Override
+  protected void shutDown() throws Exception {
+    LOG.info("Stopping PruneUpperBoundWriter Thread.");
     if (flushThread != null) {
       flushThread.interrupt();
+      flushThread.join(TimeUnit.SECONDS.toMillis(1));
     }
   }
 
@@ -76,20 +81,21 @@ public class PruneUpperBoundWriter {
     flushThread = new Thread("tephra-prune-upper-bound-writer") {
       @Override
       public void run() {
-        while (!isInterrupted()) {
+        while ((!isInterrupted()) && isRunning()) {
           long now = System.currentTimeMillis();
           if (now > (lastChecked + pruneFlushInterval)) {
-            if (shouldFlush.compareAndSet(true, false)) {
-              // should flush data
-              try {
-                dataJanitorState.savePruneUpperBoundForRegion(regionName, pruneUpperBound.get());
-              } catch (IOException ex) {
-                LOG.warn("Cannot record prune upper bound for region " + regionNameAsString + " in the table " +
-                           pruneStateTable.getNamespaceAsString() + ":" + pruneStateTable.getNameAsString() +
-                           " after compacting region.", ex);
-                // Retry again
-                shouldFlush.set(true);
+            // should flush data
+            try {
+              while (pruneEntries.firstEntry() != null) {
+                Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
+                dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
+                // We can now remove the entry only if the key and value match with what we wrote since it is
+                // possible that a new pruneUpperBound for the same key has been added
+                pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
               }
+            } catch (IOException ex) {
+              LOG.warn("Cannot record prune upper bound for a region to table " +
+                         tableName.getNamespaceAsString() + ":" + tableName.getNameAsString(), ex);
             }
             lastChecked = now;
           }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/69a6cc6b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
new file mode 100644
index 0000000..98f3334
--- /dev/null
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+
+import com.google.common.base.Supplier;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
+
+/**
+ * Supplies instances of {@link PruneUpperBoundWriter} implementations.
+ */
+public class PruneUpperBoundWriterSupplier implements Supplier<PruneUpperBoundWriter> {
+  private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriterSupplier.class);
+
+  private static volatile PruneUpperBoundWriter instance;
+  private static volatile int refCount = 0;
+  private static final Object lock = new Object();
+
+  private final TableName tableName;
+  private final DataJanitorState dataJanitorState;
+  private final long pruneFlushInterval;
+
+  public PruneUpperBoundWriterSupplier(TableName tableName, DataJanitorState dataJanitorState,
+                                       long pruneFlushInterval) {
+    this.tableName = tableName;
+    this.dataJanitorState = dataJanitorState;
+    this.pruneFlushInterval = pruneFlushInterval;
+  }
+
+  @Override
+  public PruneUpperBoundWriter get() {
+    synchronized (lock) {
+      if (instance == null) {
+        instance = new PruneUpperBoundWriter(tableName, dataJanitorState, pruneFlushInterval);
+        instance.startAndWait();
+      }
+      refCount++;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Incrementing Reference Count for PruneUpperBoundWriter " + refCount);
+      }
+      return instance;
+    }
+  }
+
+  public void release() {
+    synchronized (lock) {
+      refCount--;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Decrementing Reference Count for PruneUpperBoundWriter " + refCount);
+      }
+
+      if (refCount == 0) {
+        try {
+          instance.stopAndWait();
+        } catch (Exception ex) {
+          LOG.warn("Exception while trying to shutdown PruneUpperBoundWriter thread. ", ex);
+        } finally {
+          // If the thread is still alive (might happen if the thread was blocked on HBase PUT call), interrupt it again
+          if (instance.isAlive()) {
+            try {
+              instance.shutDown();
+            } catch (Exception e) {
+              LOG.warn("Exception while trying to shutdown PruneUpperBoundWriter thread. ", e);
+            }
+          }
+          instance = null;
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/69a6cc6b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java
new file mode 100644
index 0000000..08d3b49
--- /dev/null
+++ b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Tests to verify the behavior of {@link PruneUpperBoundWriterSupplier}.
+ */
+public class PruneUpperBoundWriterSupplierTest {
+  private static final Logger LOG = LoggerFactory.getLogger(PruneUpperBoundWriterSupplierTest.class);
+  private static final int NUM_OPS = 10000;
+  private static final int NUM_THREADS = 50;
+
+  @Test
+  public void testSupplier() throws Exception {
+    final PruneUpperBoundWriterSupplier supplier = new PruneUpperBoundWriterSupplier(null, null, 10L);
+    // Get one instance now, for later comparisons
+    final PruneUpperBoundWriter writer = supplier.get();
+    final AtomicInteger numOps = new AtomicInteger(NUM_OPS);
+    final Random random = new Random(System.currentTimeMillis());
+
+    // Start threads that will 'get' PruneUpperBoundWriters
+    ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
+    List<Future> futureList = new ArrayList<>();
+    for (int i = 0; i < NUM_THREADS; i++) {
+      futureList.add(executor.submit(new Runnable() {
+
+        @Override
+        public void run() {
+          // Perform NUM_OPS 'gets' of PruneUpperBoundWriter
+          while (numOps.decrementAndGet() > 0) {
+            PruneUpperBoundWriter newWriter = supplier.get();
+            Assert.assertTrue(newWriter == writer);
+            int waitTime = random.nextInt(10);
+            try {
+              TimeUnit.MICROSECONDS.sleep(waitTime);
+            } catch (InterruptedException e) {
+              LOG.warn("Received an exception.", e);
+            }
+          }
+        }
+      }));
+    }
+
+    for (Future future : futureList) {
+      future.get(5, TimeUnit.SECONDS);
+    }
+    executor.shutdown();
+    executor.awaitTermination(2, TimeUnit.SECONDS);
+
+    futureList.clear();
+    numOps.set(NUM_OPS);
+    // Start thread that release PruneUpperBoundWriters
+    executor = Executors.newFixedThreadPool(NUM_THREADS);
+    for (int i = 0; i < NUM_THREADS; i++) {
+      futureList.add(executor.submit(new Runnable() {
+
+        @Override
+        public void run() {
+          // We need to release all NUM_OPS 'gets' that were executed to trigger shutdown of the single instance of
+          // PruneUpperBoundWriter
+          while (numOps.decrementAndGet() > 0) {
+            supplier.release();
+            try {
+              TimeUnit.MICROSECONDS.sleep(random.nextInt(10));
+            } catch (InterruptedException e) {
+              LOG.warn("Received an exception.", e);
+            }
+          }
+        }
+      }));
+    }
+
+    for (Future future : futureList) {
+      future.get(1, TimeUnit.SECONDS);
+    }
+
+    executor.shutdown();
+    executor.awaitTermination(2, TimeUnit.SECONDS);
+
+    // Verify that the PruneUpperBoundWriter is still running and the pruneThread is still alive.
+    Assert.assertTrue(writer.isRunning());
+    Assert.assertTrue(writer.isAlive());
+
+    // Since we got one instance in the beginning, we need to release it
+    supplier.release();
+
+    // Verify that the PruneUpperBoundWriter is shutdown and the pruneThread is not alive anymore.
+    Assert.assertFalse(writer.isRunning());
+    Assert.assertFalse(writer.isAlive());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/69a6cc6b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 728adfa..9e9dd46 100644
--- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -331,8 +331,10 @@ public class TransactionProcessor extends BaseRegionObserver {
             conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
                          TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
           compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
-          LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
-                      + pruneTable);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
+                        + pruneTable);
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/69a6cc6b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
index 58596be..db7880b 100644
--- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
@@ -38,27 +38,25 @@ import javax.annotation.Nullable;
 public class CompactionState {
   private static final Log LOG = LogFactory.getLog(CompactionState.class);
 
-  private final TableName stateTable;
   private final byte[] regionName;
   private final String regionNameAsString;
-  private final DataJanitorState dataJanitorState;
-  private final long pruneFlushInterval;
-  private volatile long pruneUpperBound = -1;
+  private final PruneUpperBoundWriterSupplier pruneUpperBoundWriterSupplier;
+  private final PruneUpperBoundWriter pruneUpperBoundWriter;
 
-  private PruneUpperBoundWriter pruneUpperBoundWriter;
+  private volatile long pruneUpperBound = -1;
 
   public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable, long pruneFlushInterval) {
-    this.stateTable = stateTable;
     this.regionName = env.getRegionInfo().getRegionName();
     this.regionNameAsString = env.getRegionInfo().getRegionNameAsString();
-    this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
+    DataJanitorState dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
       @Override
       public Table get() throws IOException {
         return env.getTable(stateTable);
       }
     });
-    this.pruneFlushInterval = pruneFlushInterval;
-    this.pruneUpperBoundWriter = createPruneUpperBoundWriter();
+    this.pruneUpperBoundWriterSupplier = new PruneUpperBoundWriterSupplier(stateTable, dataJanitorState,
+                                                                           pruneFlushInterval);
+    this.pruneUpperBoundWriter = pruneUpperBoundWriterSupplier.get();
   }
 
   /**
@@ -71,38 +69,33 @@ public class CompactionState {
     if (request.isMajor() && snapshot != null) {
       Transaction tx = TxUtils.createDummyTransaction(snapshot);
       pruneUpperBound = TxUtils.getPruneUpperBound(tx);
-      LOG.debug(
-        String.format("Computed prune upper bound %s for compaction request %s using transaction state from time %s",
-                      pruneUpperBound, request, snapshot.getTimestamp()));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+          String.format("Computed prune upper bound %s for compaction request %s using transaction state from time %s",
+                        pruneUpperBound, request, snapshot.getTimestamp()));
+      }
     } else {
       pruneUpperBound = -1;
     }
   }
 
   /**
-   * Stops the current {@link PruneUpperBoundWriter}.
-   */
-  public void stop() {
-    if (pruneUpperBoundWriter != null) {
-      pruneUpperBoundWriter.stop();
-    }
-  }
-
-  /**
    * Persists the transaction state recorded by {@link #record(CompactionRequest, TransactionVisibilityState)}.
    * This method is called after the compaction has successfully completed.
    */
   public void persist() {
     if (pruneUpperBound != -1) {
-      if (!pruneUpperBoundWriter.isAlive()) {
-        pruneUpperBoundWriter = createPruneUpperBoundWriter();
+      pruneUpperBoundWriter.persistPruneEntry(regionName, pruneUpperBound);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format("Enqueued prune upper bound %s for region %s", pruneUpperBound, regionNameAsString));
       }
-      pruneUpperBoundWriter.persistPruneEntry(pruneUpperBound);
-      LOG.debug(String.format("Enqueued prune upper bound %s for region %s", pruneUpperBound, regionNameAsString));
     }
   }
 
-  private PruneUpperBoundWriter createPruneUpperBoundWriter() {
-    return new PruneUpperBoundWriter(dataJanitorState, stateTable, regionNameAsString, regionName, pruneFlushInterval);
+  /**
+   * Releases the usage {@link PruneUpperBoundWriter}.
+   */
+  public void stop() {
+    pruneUpperBoundWriterSupplier.release();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/69a6cc6b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
index 7bceaff..7e9d1a3 100644
--- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -18,57 +18,62 @@
 
 package org.apache.tephra.hbase.txprune;
 
+import com.google.common.util.concurrent.AbstractIdleService;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.hadoop.hbase.util.Bytes;
 
 import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
- * Thread that will write the the prune upper bound
+ * Thread that will write the the prune upper bound. An instance of this class should be obtained only
+ * through {@link PruneUpperBoundWriterSupplier} which will also handle the lifecycle of this instance.
  */
-public class PruneUpperBoundWriter {
-  private static final Log LOG = LogFactory.getLog(TransactionProcessor.class);
+public class PruneUpperBoundWriter extends AbstractIdleService {
+  private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriter.class);
 
-  private final TableName pruneStateTable;
+  private final TableName tableName;
   private final DataJanitorState dataJanitorState;
-  private final byte[] regionName;
-  private final String regionNameAsString;
   private final long pruneFlushInterval;
-  private final AtomicLong pruneUpperBound;
-  private final AtomicBoolean shouldFlush;
+  private final ConcurrentSkipListMap<byte[], Long> pruneEntries;
+
+  private volatile Thread flushThread;
 
-  private Thread flushThread;
   private long lastChecked;
 
-  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, TableName pruneStateTable, String regionNameAsString,
-                               byte[] regionName, long pruneFlushInterval) {
-    this.pruneStateTable = pruneStateTable;
+  public PruneUpperBoundWriter(TableName tableName, DataJanitorState dataJanitorState, long pruneFlushInterval) {
+    this.tableName = tableName;
     this.dataJanitorState = dataJanitorState;
-    this.regionName = regionName;
-    this.regionNameAsString = regionNameAsString;
     this.pruneFlushInterval = pruneFlushInterval;
-    this.pruneUpperBound = new AtomicLong();
-    this.shouldFlush = new AtomicBoolean(false);
-    startFlushThread();
+    this.pruneEntries = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
+  }
+
+  public void persistPruneEntry(byte[] regionName, long pruneUpperBound) {
+    // The number of entries in this map is bound by the number of regions in this region server and thus it will not
+    // grow indefinitely
+    pruneEntries.put(regionName, pruneUpperBound);
   }
 
   public boolean isAlive() {
-    return flushThread.isAlive();
+    return flushThread != null && flushThread.isAlive();
   }
 
-  public void persistPruneEntry(long pruneUpperBound) {
-    this.pruneUpperBound.set(pruneUpperBound);
-    this.shouldFlush.set(true);
+  @Override
+  protected void startUp() throws Exception {
+    LOG.info("Starting PruneUpperBoundWriter Thread.");
+    startFlushThread();
   }
 
-  public void stop() {
+  @Override
+  protected void shutDown() throws Exception {
+    LOG.info("Stopping PruneUpperBoundWriter Thread.");
     if (flushThread != null) {
       flushThread.interrupt();
+      flushThread.join(TimeUnit.SECONDS.toMillis(1));
     }
   }
 
@@ -76,19 +81,21 @@ public class PruneUpperBoundWriter {
     flushThread = new Thread("tephra-prune-upper-bound-writer") {
       @Override
       public void run() {
-        while (!isInterrupted()) {
+        while ((!isInterrupted()) && isRunning()) {
           long now = System.currentTimeMillis();
           if (now > (lastChecked + pruneFlushInterval)) {
-            if (shouldFlush.compareAndSet(true, false)) {
-              // should flush data
-              try {
-                dataJanitorState.savePruneUpperBoundForRegion(regionName, pruneUpperBound.get());
-              } catch (IOException ex) {
-                LOG.warn("Cannot record prune upper bound for region " + regionNameAsString + " in the table " +
-                           pruneStateTable.getNameWithNamespaceInclAsString() + " after compacting region.", ex);
-                // Retry again
-                shouldFlush.set(true);
+            // should flush data
+            try {
+              while (pruneEntries.firstEntry() != null) {
+                Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
+                dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
+                // We can now remove the entry only if the key and value match with what we wrote since it is
+                // possible that a new pruneUpperBound for the same key has been added
+                pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
               }
+            } catch (IOException ex) {
+              LOG.warn("Cannot record prune upper bound for a region to table " +
+                         tableName.getNameWithNamespaceInclAsString(), ex);
             }
             lastChecked = now;
           }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/69a6cc6b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
new file mode 100644
index 0000000..98f3334
--- /dev/null
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+
+import com.google.common.base.Supplier;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
+
+/**
+ * Supplies instances of {@link PruneUpperBoundWriter} implementations.
+ */
+public class PruneUpperBoundWriterSupplier implements Supplier<PruneUpperBoundWriter> {
+  private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriterSupplier.class);
+
+  private static volatile PruneUpperBoundWriter instance;
+  private static volatile int refCount = 0;
+  private static final Object lock = new Object();
+
+  private final TableName tableName;
+  private final DataJanitorState dataJanitorState;
+  private final long pruneFlushInterval;
+
+  public PruneUpperBoundWriterSupplier(TableName tableName, DataJanitorState dataJanitorState,
+                                       long pruneFlushInterval) {
+    this.tableName = tableName;
+    this.dataJanitorState = dataJanitorState;
+    this.pruneFlushInterval = pruneFlushInterval;
+  }
+
+  @Override
+  public PruneUpperBoundWriter get() {
+    synchronized (lock) {
+      if (instance == null) {
+        instance = new PruneUpperBoundWriter(tableName, dataJanitorState, pruneFlushInterval);
+        instance.startAndWait();
+      }
+      refCount++;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Incrementing Reference Count for PruneUpperBoundWriter " + refCount);
+      }
+      return instance;
+    }
+  }
+
+  public void release() {
+    synchronized (lock) {
+      refCount--;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Decrementing Reference Count for PruneUpperBoundWriter " + refCount);
+      }
+
+      if (refCount == 0) {
+        try {
+          instance.stopAndWait();
+        } catch (Exception ex) {
+          LOG.warn("Exception while trying to shutdown PruneUpperBoundWriter thread. ", ex);
+        } finally {
+          // If the thread is still alive (might happen if the thread was blocked on HBase PUT call), interrupt it again
+          if (instance.isAlive()) {
+            try {
+              instance.shutDown();
+            } catch (Exception e) {
+              LOG.warn("Exception while trying to shutdown PruneUpperBoundWriter thread. ", e);
+            }
+          }
+          instance = null;
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/69a6cc6b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java
new file mode 100644
index 0000000..08d3b49
--- /dev/null
+++ b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Tests to verify the behavior of {@link PruneUpperBoundWriterSupplier}.
+ */
+public class PruneUpperBoundWriterSupplierTest {
+  private static final Logger LOG = LoggerFactory.getLogger(PruneUpperBoundWriterSupplierTest.class);
+  private static final int NUM_OPS = 10000;
+  private static final int NUM_THREADS = 50;
+
+  @Test
+  public void testSupplier() throws Exception {
+    final PruneUpperBoundWriterSupplier supplier = new PruneUpperBoundWriterSupplier(null, null, 10L);
+    // Get one instance now, for later comparisons
+    final PruneUpperBoundWriter writer = supplier.get();
+    final AtomicInteger numOps = new AtomicInteger(NUM_OPS);
+    final Random random = new Random(System.currentTimeMillis());
+
+    // Start threads that will 'get' PruneUpperBoundWriters
+    ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
+    List<Future> futureList = new ArrayList<>();
+    for (int i = 0; i < NUM_THREADS; i++) {
+      futureList.add(executor.submit(new Runnable() {
+
+        @Override
+        public void run() {
+          // Perform NUM_OPS 'gets' of PruneUpperBoundWriter
+          while (numOps.decrementAndGet() > 0) {
+            PruneUpperBoundWriter newWriter = supplier.get();
+            Assert.assertTrue(newWriter == writer);
+            int waitTime = random.nextInt(10);
+            try {
+              TimeUnit.MICROSECONDS.sleep(waitTime);
+            } catch (InterruptedException e) {
+              LOG.warn("Received an exception.", e);
+            }
+          }
+        }
+      }));
+    }
+
+    for (Future future : futureList) {
+      future.get(5, TimeUnit.SECONDS);
+    }
+    executor.shutdown();
+    executor.awaitTermination(2, TimeUnit.SECONDS);
+
+    futureList.clear();
+    numOps.set(NUM_OPS);
+    // Start thread that release PruneUpperBoundWriters
+    executor = Executors.newFixedThreadPool(NUM_THREADS);
+    for (int i = 0; i < NUM_THREADS; i++) {
+      futureList.add(executor.submit(new Runnable() {
+
+        @Override
+        public void run() {
+          // We need to release all NUM_OPS 'gets' that were executed to trigger shutdown of the single instance of
+          // PruneUpperBoundWriter
+          while (numOps.decrementAndGet() > 0) {
+            supplier.release();
+            try {
+              TimeUnit.MICROSECONDS.sleep(random.nextInt(10));
+            } catch (InterruptedException e) {
+              LOG.warn("Received an exception.", e);
+            }
+          }
+        }
+      }));
+    }
+
+    for (Future future : futureList) {
+      future.get(1, TimeUnit.SECONDS);
+    }
+
+    executor.shutdown();
+    executor.awaitTermination(2, TimeUnit.SECONDS);
+
+    // Verify that the PruneUpperBoundWriter is still running and the pruneThread is still alive.
+    Assert.assertTrue(writer.isRunning());
+    Assert.assertTrue(writer.isAlive());
+
+    // Since we got one instance in the beginning, we need to release it
+    supplier.release();
+
+    // Verify that the PruneUpperBoundWriter is shutdown and the pruneThread is not alive anymore.
+    Assert.assertFalse(writer.isRunning());
+    Assert.assertFalse(writer.isAlive());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/69a6cc6b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 728adfa..9e9dd46 100644
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -331,8 +331,10 @@ public class TransactionProcessor extends BaseRegionObserver {
             conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
                          TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
           compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
-          LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
-                      + pruneTable);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
+                        + pruneTable);
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/69a6cc6b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
index 58596be..db7880b 100644
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
@@ -38,27 +38,25 @@ import javax.annotation.Nullable;
 public class CompactionState {
   private static final Log LOG = LogFactory.getLog(CompactionState.class);
 
-  private final TableName stateTable;
   private final byte[] regionName;
   private final String regionNameAsString;
-  private final DataJanitorState dataJanitorState;
-  private final long pruneFlushInterval;
-  private volatile long pruneUpperBound = -1;
+  private final PruneUpperBoundWriterSupplier pruneUpperBoundWriterSupplier;
+  private final PruneUpperBoundWriter pruneUpperBoundWriter;
 
-  private PruneUpperBoundWriter pruneUpperBoundWriter;
+  private volatile long pruneUpperBound = -1;
 
   public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable, long pruneFlushInterval) {
-    this.stateTable = stateTable;
     this.regionName = env.getRegionInfo().getRegionName();
     this.regionNameAsString = env.getRegionInfo().getRegionNameAsString();
-    this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
+    DataJanitorState dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
       @Override
       public Table get() throws IOException {
         return env.getTable(stateTable);
       }
     });
-    this.pruneFlushInterval = pruneFlushInterval;
-    this.pruneUpperBoundWriter = createPruneUpperBoundWriter();
+    this.pruneUpperBoundWriterSupplier = new PruneUpperBoundWriterSupplier(stateTable, dataJanitorState,
+                                                                           pruneFlushInterval);
+    this.pruneUpperBoundWriter = pruneUpperBoundWriterSupplier.get();
   }
 
   /**
@@ -71,38 +69,33 @@ public class CompactionState {
     if (request.isMajor() && snapshot != null) {
       Transaction tx = TxUtils.createDummyTransaction(snapshot);
       pruneUpperBound = TxUtils.getPruneUpperBound(tx);
-      LOG.debug(
-        String.format("Computed prune upper bound %s for compaction request %s using transaction state from time %s",
-                      pruneUpperBound, request, snapshot.getTimestamp()));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+          String.format("Computed prune upper bound %s for compaction request %s using transaction state from time %s",
+                        pruneUpperBound, request, snapshot.getTimestamp()));
+      }
     } else {
       pruneUpperBound = -1;
     }
   }
 
   /**
-   * Stops the current {@link PruneUpperBoundWriter}.
-   */
-  public void stop() {
-    if (pruneUpperBoundWriter != null) {
-      pruneUpperBoundWriter.stop();
-    }
-  }
-
-  /**
    * Persists the transaction state recorded by {@link #record(CompactionRequest, TransactionVisibilityState)}.
    * This method is called after the compaction has successfully completed.
    */
   public void persist() {
     if (pruneUpperBound != -1) {
-      if (!pruneUpperBoundWriter.isAlive()) {
-        pruneUpperBoundWriter = createPruneUpperBoundWriter();
+      pruneUpperBoundWriter.persistPruneEntry(regionName, pruneUpperBound);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format("Enqueued prune upper bound %s for region %s", pruneUpperBound, regionNameAsString));
       }
-      pruneUpperBoundWriter.persistPruneEntry(pruneUpperBound);
-      LOG.debug(String.format("Enqueued prune upper bound %s for region %s", pruneUpperBound, regionNameAsString));
     }
   }
 
-  private PruneUpperBoundWriter createPruneUpperBoundWriter() {
-    return new PruneUpperBoundWriter(dataJanitorState, stateTable, regionNameAsString, regionName, pruneFlushInterval);
+  /**
+   * Releases the usage {@link PruneUpperBoundWriter}.
+   */
+  public void stop() {
+    pruneUpperBoundWriterSupplier.release();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/69a6cc6b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
index c981e15..5a86b4a 100644
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -18,57 +18,62 @@
 
 package org.apache.tephra.hbase.txprune;
 
+import com.google.common.util.concurrent.AbstractIdleService;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.hadoop.hbase.util.Bytes;
 
 import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
- * Thread that will write the the prune upper bound
+ * Thread that will write the the prune upper bound. An instance of this class should be obtained only
+ * through {@link PruneUpperBoundWriterSupplier} which will also handle the lifecycle of this instance.
  */
-public class PruneUpperBoundWriter {
-  private static final Log LOG = LogFactory.getLog(TransactionProcessor.class);
+public class PruneUpperBoundWriter extends AbstractIdleService {
+  private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriter.class);
 
-  private final TableName pruneStateTable;
+  private final TableName tableName;
   private final DataJanitorState dataJanitorState;
-  private final byte[] regionName;
-  private final String regionNameAsString;
   private final long pruneFlushInterval;
-  private final AtomicLong pruneUpperBound;
-  private final AtomicBoolean shouldFlush;
+  private final ConcurrentSkipListMap<byte[], Long> pruneEntries;
+
+  private volatile Thread flushThread;
 
-  private Thread flushThread;
   private long lastChecked;
 
-  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, TableName pruneStateTable, String regionNameAsString,
-                               byte[] regionName, long pruneFlushInterval) {
-    this.pruneStateTable = pruneStateTable;
+  public PruneUpperBoundWriter(TableName tableName, DataJanitorState dataJanitorState, long pruneFlushInterval) {
+    this.tableName = tableName;
     this.dataJanitorState = dataJanitorState;
-    this.regionName = regionName;
-    this.regionNameAsString = regionNameAsString;
     this.pruneFlushInterval = pruneFlushInterval;
-    this.pruneUpperBound = new AtomicLong();
-    this.shouldFlush = new AtomicBoolean(false);
-    startFlushThread();
+    this.pruneEntries = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
+  }
+
+  public void persistPruneEntry(byte[] regionName, long pruneUpperBound) {
+    // The number of entries in this map is bound by the number of regions in this region server and thus it will not
+    // grow indefinitely
+    pruneEntries.put(regionName, pruneUpperBound);
   }
 
   public boolean isAlive() {
-    return flushThread.isAlive();
+    return flushThread != null && flushThread.isAlive();
   }
 
-  public void persistPruneEntry(long pruneUpperBound) {
-    this.pruneUpperBound.set(pruneUpperBound);
-    this.shouldFlush.set(true);
+  @Override
+  protected void startUp() throws Exception {
+    LOG.info("Starting PruneUpperBoundWriter Thread.");
+    startFlushThread();
   }
 
-  public void stop() {
+  @Override
+  protected void shutDown() throws Exception {
+    LOG.info("Stopping PruneUpperBoundWriter Thread.");
     if (flushThread != null) {
       flushThread.interrupt();
+      flushThread.join(TimeUnit.SECONDS.toMillis(1));
     }
   }
 
@@ -76,20 +81,21 @@ public class PruneUpperBoundWriter {
     flushThread = new Thread("tephra-prune-upper-bound-writer") {
       @Override
       public void run() {
-        while (!isInterrupted()) {
+        while ((!isInterrupted()) && isRunning()) {
           long now = System.currentTimeMillis();
           if (now > (lastChecked + pruneFlushInterval)) {
-            if (shouldFlush.compareAndSet(true, false)) {
-              // should flush data
-              try {
-                dataJanitorState.savePruneUpperBoundForRegion(regionName, pruneUpperBound.get());
-              } catch (IOException ex) {
-                LOG.warn("Cannot record prune upper bound for region " + regionNameAsString + " in the table " +
-                           pruneStateTable.getNamespaceAsString() + ":" + pruneStateTable.getNameAsString() +
-                           " after compacting region.", ex);
-                // Retry again
-                shouldFlush.set(true);
+            // should flush data
+            try {
+              while (pruneEntries.firstEntry() != null) {
+                Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
+                dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
+                // We can now remove the entry only if the key and value match with what we wrote since it is
+                // possible that a new pruneUpperBound for the same key has been added
+                pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
               }
+            } catch (IOException ex) {
+              LOG.warn("Cannot record prune upper bound for a region to table " +
+                         tableName.getNamespaceAsString() + ":" + tableName.getNameAsString(), ex);
             }
             lastChecked = now;
           }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/69a6cc6b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
new file mode 100644
index 0000000..98f3334
--- /dev/null
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+
+import com.google.common.base.Supplier;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
+
+/**
+ * Supplies instances of {@link PruneUpperBoundWriter} implementations.
+ */
+public class PruneUpperBoundWriterSupplier implements Supplier<PruneUpperBoundWriter> {
+  private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriterSupplier.class);
+
+  private static volatile PruneUpperBoundWriter instance;
+  private static volatile int refCount = 0;
+  private static final Object lock = new Object();
+
+  private final TableName tableName;
+  private final DataJanitorState dataJanitorState;
+  private final long pruneFlushInterval;
+
+  public PruneUpperBoundWriterSupplier(TableName tableName, DataJanitorState dataJanitorState,
+                                       long pruneFlushInterval) {
+    this.tableName = tableName;
+    this.dataJanitorState = dataJanitorState;
+    this.pruneFlushInterval = pruneFlushInterval;
+  }
+
+  @Override
+  public PruneUpperBoundWriter get() {
+    synchronized (lock) {
+      if (instance == null) {
+        instance = new PruneUpperBoundWriter(tableName, dataJanitorState, pruneFlushInterval);
+        instance.startAndWait();
+      }
+      refCount++;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Incrementing Reference Count for PruneUpperBoundWriter " + refCount);
+      }
+      return instance;
+    }
+  }
+
+  public void release() {
+    synchronized (lock) {
+      refCount--;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Decrementing Reference Count for PruneUpperBoundWriter " + refCount);
+      }
+
+      if (refCount == 0) {
+        try {
+          instance.stopAndWait();
+        } catch (Exception ex) {
+          LOG.warn("Exception while trying to shutdown PruneUpperBoundWriter thread. ", ex);
+        } finally {
+          // If the thread is still alive (might happen if the thread was blocked on HBase PUT call), interrupt it again
+          if (instance.isAlive()) {
+            try {
+              instance.shutDown();
+            } catch (Exception e) {
+              LOG.warn("Exception while trying to shutdown PruneUpperBoundWriter thread. ", e);
+            }
+          }
+          instance = null;
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/69a6cc6b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java
new file mode 100644
index 0000000..08d3b49
--- /dev/null
+++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Tests to verify the behavior of {@link PruneUpperBoundWriterSupplier}.
+ */
+public class PruneUpperBoundWriterSupplierTest {
+  private static final Logger LOG = LoggerFactory.getLogger(PruneUpperBoundWriterSupplierTest.class);
+  private static final int NUM_OPS = 10000;
+  private static final int NUM_THREADS = 50;
+
+  @Test
+  public void testSupplier() throws Exception {
+    final PruneUpperBoundWriterSupplier supplier = new PruneUpperBoundWriterSupplier(null, null, 10L);
+    // Get one instance now, for later comparisons
+    final PruneUpperBoundWriter writer = supplier.get();
+    final AtomicInteger numOps = new AtomicInteger(NUM_OPS);
+    final Random random = new Random(System.currentTimeMillis());
+
+    // Start threads that will 'get' PruneUpperBoundWriters
+    ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
+    List<Future> futureList = new ArrayList<>();
+    for (int i = 0; i < NUM_THREADS; i++) {
+      futureList.add(executor.submit(new Runnable() {
+
+        @Override
+        public void run() {
+          // Perform NUM_OPS 'gets' of PruneUpperBoundWriter
+          while (numOps.decrementAndGet() > 0) {
+            PruneUpperBoundWriter newWriter = supplier.get();
+            Assert.assertTrue(newWriter == writer);
+            int waitTime = random.nextInt(10);
+            try {
+              TimeUnit.MICROSECONDS.sleep(waitTime);
+            } catch (InterruptedException e) {
+              LOG.warn("Received an exception.", e);
+            }
+          }
+        }
+      }));
+    }
+
+    for (Future future : futureList) {
+      future.get(5, TimeUnit.SECONDS);
+    }
+    executor.shutdown();
+    executor.awaitTermination(2, TimeUnit.SECONDS);
+
+    futureList.clear();
+    numOps.set(NUM_OPS);
+    // Start thread that release PruneUpperBoundWriters
+    executor = Executors.newFixedThreadPool(NUM_THREADS);
+    for (int i = 0; i < NUM_THREADS; i++) {
+      futureList.add(executor.submit(new Runnable() {
+
+        @Override
+        public void run() {
+          // We need to release all NUM_OPS 'gets' that were executed to trigger shutdown of the single instance of
+          // PruneUpperBoundWriter
+          while (numOps.decrementAndGet() > 0) {
+            supplier.release();
+            try {
+              TimeUnit.MICROSECONDS.sleep(random.nextInt(10));
+            } catch (InterruptedException e) {
+              LOG.warn("Received an exception.", e);
+            }
+          }
+        }
+      }));
+    }
+
+    for (Future future : futureList) {
+      future.get(1, TimeUnit.SECONDS);
+    }
+
+    executor.shutdown();
+    executor.awaitTermination(2, TimeUnit.SECONDS);
+
+    // Verify that the PruneUpperBoundWriter is still running and the pruneThread is still alive.
+    Assert.assertTrue(writer.isRunning());
+    Assert.assertTrue(writer.isAlive());
+
+    // Since we got one instance in the beginning, we need to release it
+    supplier.release();
+
+    // Verify that the PruneUpperBoundWriter is shutdown and the pruneThread is not alive anymore.
+    Assert.assertFalse(writer.isRunning());
+    Assert.assertFalse(writer.isAlive());
+  }
+}