You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by go...@apache.org on 2017/02/14 05:40:28 UTC

incubator-tephra git commit: TEPHRA-219 execute cross region calls in coprocessor as the login user.

Repository: incubator-tephra
Updated Branches:
  refs/heads/master 2af5ac2bd -> 203825e29


TEPHRA-219 execute cross region calls in coprocessor as the login user.

This closes #35 from GitHub.
Make pruneEnable and txMaxLifetimeMillis volatile so that derived
classes can make use of it.

Introduced stopped variable in PruneUpperBoundWriter.

Signed-off-by: Gokul Gunasekaran <go...@cask.co>


Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/203825e2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/203825e2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/203825e2

Branch: refs/heads/master
Commit: 203825e29af43b24029ef83dc512a4b8a3deeadc
Parents: 2af5ac2
Author: Gokul Gunasekaran <go...@cask.co>
Authored: Sat Feb 11 19:42:02 2017 -0800
Committer: Gokul Gunasekaran <go...@cask.co>
Committed: Mon Feb 13 21:40:11 2017 -0800

----------------------------------------------------------------------
 .../hbase/coprocessor/TransactionProcessor.java | 95 ++++++++++---------
 .../hbase/txprune/PruneUpperBoundWriter.java    | 58 ++++++++----
 .../hbase/coprocessor/TransactionProcessor.java | 97 +++++++++++---------
 .../hbase/txprune/PruneUpperBoundWriter.java    | 44 +++++----
 .../hbase/coprocessor/TransactionProcessor.java | 97 +++++++++++---------
 .../hbase/txprune/PruneUpperBoundWriter.java    | 44 +++++----
 .../hbase/coprocessor/TransactionProcessor.java | 97 +++++++++++---------
 .../hbase/txprune/PruneUpperBoundWriter.java    | 44 +++++----
 .../hbase/coprocessor/TransactionProcessor.java | 97 +++++++++++---------
 .../hbase/txprune/PruneUpperBoundWriter.java    | 44 +++++----
 10 files changed, 405 insertions(+), 312 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/203825e2/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 9ff4d3b..d2402a6 100644
--- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -107,12 +107,13 @@ public class TransactionProcessor extends BaseRegionObserver {
 
   private final TransactionCodec txCodec;
   private TransactionStateCache cache;
-  private CompactionState compactionState;
+  private volatile CompactionState compactionState;
+
+  protected volatile Boolean pruneEnable;
+  protected volatile Long txMaxLifetimeMillis;
   protected Map<byte[], Long> ttlByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
   protected boolean allowEmptyValues = TxConstants.ALLOW_EMPTY_VALUES_DEFAULT;
   protected boolean readNonTxnData = TxConstants.DEFAULT_READ_NON_TX_DATA;
-  protected Long txMaxLifetimeMillis;
-  private Boolean pruneEnable;
 
   public TransactionProcessor() {
     this.txCodec = new TransactionCodec();
@@ -144,10 +145,12 @@ public class TransactionProcessor extends BaseRegionObserver {
       }
 
       this.allowEmptyValues = getAllowEmptyValues(env, tableDesc);
+      this.txMaxLifetimeMillis = getTxMaxLifetimeMillis(env);
       this.readNonTxnData = Boolean.valueOf(tableDesc.getValue(TxConstants.READ_NON_TX_DATA));
       if (readNonTxnData) {
         LOG.info("Reading pre-existing data enabled for table " + tableDesc.getNameAsString());
       }
+      initializePruneState(env);
     }
   }
 
@@ -171,9 +174,7 @@ public class TransactionProcessor extends BaseRegionObserver {
 
   @Override
   public void stop(CoprocessorEnvironment e) throws IOException {
-    if (compactionState != null) {
-      compactionState.stop();
-    }
+    resetPruneState();
   }
 
   @Override
@@ -248,6 +249,15 @@ public class TransactionProcessor extends BaseRegionObserver {
       Boolean.valueOf(allowEmptyValuesFromTableDesc) : allowEmptyValuesFromConfig;
   }
 
+  private long getTxMaxLifetimeMillis(RegionCoprocessorEnvironment env) {
+    Configuration conf = getConfiguration(env);
+    if (conf != null) {
+      return TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME,
+                                                   TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME));
+    }
+    return TimeUnit.SECONDS.toMillis(TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME);
+  }
+
   private boolean isFamilyDelete(List<Cell> familyCells) {
     return familyCells.size() == 1 && CellUtil.isDeleteFamily(familyCells.get(0));
   }
@@ -321,15 +331,10 @@ public class TransactionProcessor extends BaseRegionObserver {
     LOG.debug(String.format("Region %s: memstore size = %s, num store files = %s",
                             region.getRegionInfo().getRegionNameAsString(), memstoreSize, numStoreFiles));
     if (memstoreSize == 0 && numStoreFiles == 0) {
-      if (pruneEnable == null) {
-        initPruneState(e);
-      }
-
-      if (Boolean.TRUE.equals(pruneEnable)) {
+      if (compactionState != null) {
         compactionState.persistRegionEmpty(System.currentTimeMillis());
       }
     }
-
   }
 
   @Override
@@ -340,12 +345,8 @@ public class TransactionProcessor extends BaseRegionObserver {
     // Get the latest tx snapshot state for the compaction
     TransactionVisibilityState snapshot = cache.getLatestState();
 
-    if (pruneEnable == null) {
-      initPruneState(c);
-    }
-
-    if (Boolean.TRUE.equals(pruneEnable)) {
-      // Record tx state before the compaction
+    // Record tx state before the compaction
+    if (compactionState != null) {
       compactionState.record(request, snapshot);
     }
 
@@ -416,21 +417,8 @@ public class TransactionProcessor extends BaseRegionObserver {
       return;
     }
 
-    if (txMaxLifetimeMillis == null) {
-      Configuration conf = getConfiguration(env);
-      // Configuration won't be null in TransactionProcessor but the derived classes might return
-      // null if it is not available temporarily
-      if (conf != null) {
-        this.txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME,
-                                                                         TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME));
-      } else {
-        throw new IOException(String.format("Could not validate Transaction since the value of max lifetime is " +
-                                              "unavailable. Please retry the operation."));
-      }
-    }
-
     boolean validLifetime =
-      TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis > System.currentTimeMillis();
+      (TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis) > System.currentTimeMillis();
     if (!validLifetime) {
       throw new DoNotRetryIOException(String.format("Transaction %s has exceeded max lifetime %s ms",
                                                     tx.getTransactionId(), txMaxLifetimeMillis));
@@ -454,28 +442,47 @@ public class TransactionProcessor extends BaseRegionObserver {
     return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter);
   }
 
-  private void initPruneState(ObserverContext<RegionCoprocessorEnvironment> c) {
-    Configuration conf = getConfiguration(c.getEnvironment());
-    // Configuration won't be null in TransactionProcessor but the derived classes might return
-    // null if it is not available temporarily
+  /**
+   * Refresh the properties related to transaction pruning. This method needs to be invoked if there is change in the
+   * prune related properties after clearing the state by calling {@link #resetPruneState}.
+   *
+   * @param env {@link RegionCoprocessorEnvironment} of this region
+   */
+  protected void initializePruneState(RegionCoprocessorEnvironment env) {
+    Configuration conf = getConfiguration(env);
     if (conf != null) {
       pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
                                     TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
+
       if (Boolean.TRUE.equals(pruneEnable)) {
-        String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
-                                     TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
-        long pruneFlushInterval = TimeUnit.SECONDS.toMillis(
-          conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
-                       TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
-        compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
+        TableName pruneTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+                                                          TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
+        long pruneFlushInterval = TimeUnit.SECONDS.toMillis(conf.getLong(
+          TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
+          TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
+
+        compactionState = new CompactionState(env, pruneTable, pruneFlushInterval);
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
-                      + pruneTable);
+          TableName name = env.getRegion().getRegionInfo().getTable();
+          LOG.debug(String.format("Automatic invalid list pruning is enabled for table %s:%s. Compaction state will " +
+                                    "be recorded in table %s:%s", name.getNamespaceAsString(), name.getNameAsString(),
+                                  pruneTable.getNamespaceAsString(), pruneTable.getNameAsString()));
         }
       }
     }
   }
 
+  /**
+   * Stop and clear state related to pruning.
+   */
+  protected void resetPruneState() {
+    pruneEnable = false;
+    if (compactionState != null) {
+      compactionState.stop();
+      compactionState = null;
+    }
+  }
+
   private long numStoreFilesForRegion(ObserverContext<RegionCoprocessorEnvironment> c) {
     long numStoreFiles = 0;
     for (Store store : c.getEnvironment().getRegion().getStores().values()) {

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/203825e2/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
index beed1ad..7e4a0fa 100644
--- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -23,8 +23,10 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.security.UserGroupInformation;
 
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 import java.util.Map;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.TimeUnit;
@@ -45,6 +47,7 @@ public class PruneUpperBoundWriter extends AbstractIdleService {
   private final ConcurrentSkipListMap<byte[], Long> emptyRegions;
 
   private volatile Thread flushThread;
+  private volatile boolean stopped;
 
   private long lastChecked;
 
@@ -87,6 +90,7 @@ public class PruneUpperBoundWriter extends AbstractIdleService {
   @Override
   protected void shutDown() throws Exception {
     LOG.info("Stopping PruneUpperBoundWriter Thread.");
+    stopped = true;
     if (flushThread != null) {
       flushThread.interrupt();
       flushThread.join(TimeUnit.SECONDS.toMillis(1));
@@ -97,30 +101,36 @@ public class PruneUpperBoundWriter extends AbstractIdleService {
     flushThread = new Thread("tephra-prune-upper-bound-writer") {
       @Override
       public void run() {
-        while ((!isInterrupted()) && isRunning()) {
+        while ((!isInterrupted()) && (!stopped)) {
           long now = System.currentTimeMillis();
           if (now > (lastChecked + pruneFlushInterval)) {
             // should flush data
             try {
-              // Record prune upper bound
-              while (!pruneEntries.isEmpty()) {
-                Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
-                dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
-                // We can now remove the entry only if the key and value match with what we wrote since it is
-                // possible that a new pruneUpperBound for the same key has been added
-                pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
-              }
-              // Record empty regions
-              while (!emptyRegions.isEmpty()) {
-                Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry();
-                dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey());
-                // We can now remove the entry only if the key and value match with what we wrote since it is
-                // possible that a new value for the same key has been added
-                emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue());
-              }
-            } catch (IOException ex) {
-              LOG.warn("Cannot record prune upper bound for a region to table " +
-                         tableName.getNamespaceAsString() + ":" + tableName.getNameAsString(), ex);
+              UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction<Void>() {
+                @Override
+                public Void run() throws Exception {
+                  // Record prune upper bound
+                  while (!pruneEntries.isEmpty()) {
+                    Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
+                    dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
+                    // We can now remove the entry only if the key and value match with what we wrote since it is
+                    // possible that a new pruneUpperBound for the same key has been added
+                    pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
+                  }
+                  // Record empty regions
+                  while (!emptyRegions.isEmpty()) {
+                    Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry();
+                    dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey());
+                    // We can now remove the entry only if the key and value match with what we wrote since it is
+                    // possible that a new value for the same key has been added
+                    emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue());
+                  }
+                  return null;
+                }
+              });
+            } catch (IOException | InterruptedException ex) {
+              // Handle any exception that might be thrown during HBase operation
+              handleException(ex);
             }
             lastChecked = now;
           }
@@ -147,4 +157,12 @@ public class PruneUpperBoundWriter extends AbstractIdleService {
                              Bytes.toStringBinary(regionName), isRunning() ? "alive" : "running"));
     }
   }
+
+  private void handleException(Exception ex) {
+    LOG.warn("Cannot record prune upper bound for a region to table " +
+               tableName.getNamespaceAsString() + ":" + tableName.getNameAsString(), ex);
+    if (ex instanceof IOException) {
+      Thread.currentThread().interrupt();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/203825e2/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 7485b91..84776cf 100644
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -107,12 +107,13 @@ public class TransactionProcessor extends BaseRegionObserver {
 
   private final TransactionCodec txCodec;
   private TransactionStateCache cache;
-  private CompactionState compactionState;
+  private volatile CompactionState compactionState;
+
+  protected volatile Boolean pruneEnable;
+  protected volatile Long txMaxLifetimeMillis;
   protected Map<byte[], Long> ttlByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
   protected boolean allowEmptyValues = TxConstants.ALLOW_EMPTY_VALUES_DEFAULT;
   protected boolean readNonTxnData = TxConstants.DEFAULT_READ_NON_TX_DATA;
-  protected Long txMaxLifetimeMillis;
-  private Boolean pruneEnable;
 
   public TransactionProcessor() {
     this.txCodec = new TransactionCodec();
@@ -144,10 +145,12 @@ public class TransactionProcessor extends BaseRegionObserver {
       }
 
       this.allowEmptyValues = getAllowEmptyValues(env, tableDesc);
+      this.txMaxLifetimeMillis = getTxMaxLifetimeMillis(env);
       this.readNonTxnData = Boolean.valueOf(tableDesc.getValue(TxConstants.READ_NON_TX_DATA));
       if (readNonTxnData) {
         LOG.info("Reading pre-existing data enabled for table " + tableDesc.getNameAsString());
       }
+      initializePruneState(env);
     }
   }
 
@@ -171,9 +174,7 @@ public class TransactionProcessor extends BaseRegionObserver {
 
   @Override
   public void stop(CoprocessorEnvironment e) throws IOException {
-    if (compactionState != null) {
-      compactionState.stop();
-    }
+    resetPruneState();
   }
 
   @Override
@@ -248,6 +249,15 @@ public class TransactionProcessor extends BaseRegionObserver {
       Boolean.valueOf(allowEmptyValuesFromTableDesc) : allowEmptyValuesFromConfig;
   }
 
+  private long getTxMaxLifetimeMillis(RegionCoprocessorEnvironment env) {
+    Configuration conf = getConfiguration(env);
+    if (conf != null) {
+      return TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME,
+                                                   TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME));
+    }
+    return TimeUnit.SECONDS.toMillis(TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME);
+  }
+
   private boolean isFamilyDelete(List<Cell> familyCells) {
     return familyCells.size() == 1 && CellUtil.isDeleteFamily(familyCells.get(0));
   }
@@ -321,15 +331,10 @@ public class TransactionProcessor extends BaseRegionObserver {
     LOG.debug(String.format("Region %s: memstore size = %s, num store files = %s",
                             region.getRegionInfo().getRegionNameAsString(), memstoreSize, numStoreFiles));
     if (memstoreSize == 0 && numStoreFiles == 0) {
-      if (pruneEnable == null) {
-        initPruneState(e);
-      }
-
-      if (Boolean.TRUE.equals(pruneEnable)) {
+      if (compactionState != null) {
         compactionState.persistRegionEmpty(System.currentTimeMillis());
       }
     }
-
   }
 
   @Override
@@ -340,12 +345,8 @@ public class TransactionProcessor extends BaseRegionObserver {
     // Get the latest tx snapshot state for the compaction
     TransactionVisibilityState snapshot = cache.getLatestState();
 
-    if (pruneEnable == null) {
-      initPruneState(c);
-    }
-
-    if (Boolean.TRUE.equals(pruneEnable)) {
-      // Record tx state before the compaction
+    // Record tx state before the compaction
+    if (compactionState != null) {
       compactionState.record(request, snapshot);
     }
 
@@ -356,7 +357,7 @@ public class TransactionProcessor extends BaseRegionObserver {
   @Override
   public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, StoreFile resultFile,
                           CompactionRequest request) throws IOException {
-    // Persist the compaction state after a succesful compaction
+    // Persist the compaction state after a successful compaction
     if (compactionState != null) {
       compactionState.persist();
     }
@@ -416,21 +417,8 @@ public class TransactionProcessor extends BaseRegionObserver {
       return;
     }
 
-    if (txMaxLifetimeMillis == null) {
-      Configuration conf = getConfiguration(env);
-      // Configuration won't be null in TransactionProcessor but the derived classes might return
-      // null if it is not available temporarily
-      if (conf != null) {
-        this.txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME,
-                                                                         TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME));
-      } else {
-        throw new IOException(String.format("Could not validate Transaction since the value of max lifetime is " +
-                                              "unavailable. Please retry the operation."));
-      }
-    }
-
     boolean validLifetime =
-      TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis > System.currentTimeMillis();
+      (TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis) > System.currentTimeMillis();
     if (!validLifetime) {
       throw new DoNotRetryIOException(String.format("Transaction %s has exceeded max lifetime %s ms",
                                                     tx.getTransactionId(), txMaxLifetimeMillis));
@@ -454,28 +442,47 @@ public class TransactionProcessor extends BaseRegionObserver {
     return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter);
   }
 
-  private void initPruneState(ObserverContext<RegionCoprocessorEnvironment> c) {
-    Configuration conf = getConfiguration(c.getEnvironment());
-    // Configuration won't be null in TransactionProcessor but the derived classes might return
-    // null if it is not available temporarily
+  /**
+   * Refresh the properties related to transaction pruning. This method needs to be invoked if there is change in the
+   * prune related properties after clearing the state by calling {@link #resetPruneState}.
+   *
+   * @param env {@link RegionCoprocessorEnvironment} of this region
+   */
+  protected void initializePruneState(RegionCoprocessorEnvironment env) {
+    Configuration conf = getConfiguration(env);
     if (conf != null) {
       pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
                                     TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
+
       if (Boolean.TRUE.equals(pruneEnable)) {
-        String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
-                                     TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
-        long pruneFlushInterval = TimeUnit.SECONDS.toMillis(
-          conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
-                       TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
-        compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
+        TableName pruneTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+                                                          TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
+        long pruneFlushInterval = TimeUnit.SECONDS.toMillis(conf.getLong(
+          TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
+          TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
+
+        compactionState = new CompactionState(env, pruneTable, pruneFlushInterval);
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
-                      + pruneTable);
+          TableName name = env.getRegion().getRegionInfo().getTable();
+          LOG.debug(String.format("Automatic invalid list pruning is enabled for table %s:%s. Compaction state will " +
+                                    "be recorded in table %s:%s", name.getNamespaceAsString(), name.getNameAsString(),
+                                  pruneTable.getNamespaceAsString(), pruneTable.getNameAsString()));
         }
       }
     }
   }
 
+  /**
+   * Stop and clear state related to pruning.
+   */
+  protected void resetPruneState() {
+    pruneEnable = false;
+    if (compactionState != null) {
+      compactionState.stop();
+      compactionState = null;
+    }
+  }
+
   private long numStoreFilesForRegion(ObserverContext<RegionCoprocessorEnvironment> c) {
     long numStoreFiles = 0;
     for (Store store : c.getEnvironment().getRegion().getStores().values()) {

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/203825e2/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
index beed1ad..38c1a6f 100644
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -22,9 +22,11 @@ import com.google.common.util.concurrent.AbstractIdleService;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
 
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 import java.util.Map;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.TimeUnit;
@@ -45,6 +47,7 @@ public class PruneUpperBoundWriter extends AbstractIdleService {
   private final ConcurrentSkipListMap<byte[], Long> emptyRegions;
 
   private volatile Thread flushThread;
+  private volatile boolean stopped;
 
   private long lastChecked;
 
@@ -87,6 +90,7 @@ public class PruneUpperBoundWriter extends AbstractIdleService {
   @Override
   protected void shutDown() throws Exception {
     LOG.info("Stopping PruneUpperBoundWriter Thread.");
+    stopped = true;
     if (flushThread != null) {
       flushThread.interrupt();
       flushThread.join(TimeUnit.SECONDS.toMillis(1));
@@ -97,27 +101,33 @@ public class PruneUpperBoundWriter extends AbstractIdleService {
     flushThread = new Thread("tephra-prune-upper-bound-writer") {
       @Override
       public void run() {
-        while ((!isInterrupted()) && isRunning()) {
+        while ((!isInterrupted()) && (!stopped)) {
           long now = System.currentTimeMillis();
           if (now > (lastChecked + pruneFlushInterval)) {
             // should flush data
             try {
-              // Record prune upper bound
-              while (!pruneEntries.isEmpty()) {
-                Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
-                dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
-                // We can now remove the entry only if the key and value match with what we wrote since it is
-                // possible that a new pruneUpperBound for the same key has been added
-                pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
-              }
-              // Record empty regions
-              while (!emptyRegions.isEmpty()) {
-                Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry();
-                dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey());
-                // We can now remove the entry only if the key and value match with what we wrote since it is
-                // possible that a new value for the same key has been added
-                emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue());
-              }
+              User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
+                @Override
+                public Void run() throws Exception {
+                  // Record prune upper bound
+                  while (!pruneEntries.isEmpty()) {
+                    Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
+                    dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
+                    // We can now remove the entry only if the key and value match with what we wrote since it is
+                    // possible that a new pruneUpperBound for the same key has been added
+                    pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
+                  }
+                  // Record empty regions
+                  while (!emptyRegions.isEmpty()) {
+                    Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry();
+                    dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey());
+                    // We can now remove the entry only if the key and value match with what we wrote since it is
+                    // possible that a new value for the same key has been added
+                    emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue());
+                  }
+                  return null;
+                }
+              });
             } catch (IOException ex) {
               LOG.warn("Cannot record prune upper bound for a region to table " +
                          tableName.getNamespaceAsString() + ":" + tableName.getNameAsString(), ex);

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/203825e2/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 7485b91..b73bdc1 100644
--- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -107,12 +107,13 @@ public class TransactionProcessor extends BaseRegionObserver {
 
   private final TransactionCodec txCodec;
   private TransactionStateCache cache;
-  private CompactionState compactionState;
+  private volatile CompactionState compactionState;
+
+  protected volatile Boolean pruneEnable;
+  protected volatile Long txMaxLifetimeMillis;
   protected Map<byte[], Long> ttlByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
   protected boolean allowEmptyValues = TxConstants.ALLOW_EMPTY_VALUES_DEFAULT;
   protected boolean readNonTxnData = TxConstants.DEFAULT_READ_NON_TX_DATA;
-  protected Long txMaxLifetimeMillis;
-  private Boolean pruneEnable;
 
   public TransactionProcessor() {
     this.txCodec = new TransactionCodec();
@@ -144,10 +145,12 @@ public class TransactionProcessor extends BaseRegionObserver {
       }
 
       this.allowEmptyValues = getAllowEmptyValues(env, tableDesc);
+      this.txMaxLifetimeMillis = getTxMaxLifetimeMillis(env);
       this.readNonTxnData = Boolean.valueOf(tableDesc.getValue(TxConstants.READ_NON_TX_DATA));
       if (readNonTxnData) {
         LOG.info("Reading pre-existing data enabled for table " + tableDesc.getNameAsString());
       }
+      initializePruneState(env);
     }
   }
 
@@ -171,9 +174,7 @@ public class TransactionProcessor extends BaseRegionObserver {
 
   @Override
   public void stop(CoprocessorEnvironment e) throws IOException {
-    if (compactionState != null) {
-      compactionState.stop();
-    }
+    resetPruneState();
   }
 
   @Override
@@ -248,6 +249,15 @@ public class TransactionProcessor extends BaseRegionObserver {
       Boolean.valueOf(allowEmptyValuesFromTableDesc) : allowEmptyValuesFromConfig;
   }
 
+  private long getTxMaxLifetimeMillis(RegionCoprocessorEnvironment env) {
+    Configuration conf = getConfiguration(env);
+    if (conf != null) {
+      return TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME,
+                                                   TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME));
+    }
+    return TimeUnit.SECONDS.toMillis(TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME);
+  }
+
   private boolean isFamilyDelete(List<Cell> familyCells) {
     return familyCells.size() == 1 && CellUtil.isDeleteFamily(familyCells.get(0));
   }
@@ -321,15 +331,10 @@ public class TransactionProcessor extends BaseRegionObserver {
     LOG.debug(String.format("Region %s: memstore size = %s, num store files = %s",
                             region.getRegionInfo().getRegionNameAsString(), memstoreSize, numStoreFiles));
     if (memstoreSize == 0 && numStoreFiles == 0) {
-      if (pruneEnable == null) {
-        initPruneState(e);
-      }
-
-      if (Boolean.TRUE.equals(pruneEnable)) {
+      if (compactionState != null) {
         compactionState.persistRegionEmpty(System.currentTimeMillis());
       }
     }
-
   }
 
   @Override
@@ -340,12 +345,8 @@ public class TransactionProcessor extends BaseRegionObserver {
     // Get the latest tx snapshot state for the compaction
     TransactionVisibilityState snapshot = cache.getLatestState();
 
-    if (pruneEnable == null) {
-      initPruneState(c);
-    }
-
-    if (Boolean.TRUE.equals(pruneEnable)) {
-      // Record tx state before the compaction
+    // Record tx state before the compaction
+    if (compactionState != null) {
       compactionState.record(request, snapshot);
     }
 
@@ -356,7 +357,7 @@ public class TransactionProcessor extends BaseRegionObserver {
   @Override
   public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, StoreFile resultFile,
                           CompactionRequest request) throws IOException {
-    // Persist the compaction state after a succesful compaction
+    // Persist the compaction state after a successful compaction
     if (compactionState != null) {
       compactionState.persist();
     }
@@ -416,21 +417,8 @@ public class TransactionProcessor extends BaseRegionObserver {
       return;
     }
 
-    if (txMaxLifetimeMillis == null) {
-      Configuration conf = getConfiguration(env);
-      // Configuration won't be null in TransactionProcessor but the derived classes might return
-      // null if it is not available temporarily
-      if (conf != null) {
-        this.txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME,
-                                                                         TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME));
-      } else {
-        throw new IOException(String.format("Could not validate Transaction since the value of max lifetime is " +
-                                              "unavailable. Please retry the operation."));
-      }
-    }
-
     boolean validLifetime =
-      TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis > System.currentTimeMillis();
+      (TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis) > System.currentTimeMillis();
     if (!validLifetime) {
       throw new DoNotRetryIOException(String.format("Transaction %s has exceeded max lifetime %s ms",
                                                     tx.getTransactionId(), txMaxLifetimeMillis));
@@ -454,28 +442,47 @@ public class TransactionProcessor extends BaseRegionObserver {
     return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter);
   }
 
-  private void initPruneState(ObserverContext<RegionCoprocessorEnvironment> c) {
-    Configuration conf = getConfiguration(c.getEnvironment());
-    // Configuration won't be null in TransactionProcessor but the derived classes might return
-    // null if it is not available temporarily
+  /**
+   * Refresh the properties related to transaction pruning. This method needs to be invoked if there is change in the
+   * prune related properties after clearing the state by calling {@link #resetPruneState}.
+   *
+   * @param env {@link RegionCoprocessorEnvironment} of this region
+   */
+  protected void initializePruneState(RegionCoprocessorEnvironment env) {
+    Configuration conf = getConfiguration(env);
     if (conf != null) {
       pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
                                     TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
+
       if (Boolean.TRUE.equals(pruneEnable)) {
-        String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
-                                     TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
-        long pruneFlushInterval = TimeUnit.SECONDS.toMillis(
-          conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
-                       TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
-        compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
+        TableName pruneTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+                                                          TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
+        long pruneFlushInterval = TimeUnit.SECONDS.toMillis(conf.getLong(
+          TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
+          TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
+
+        compactionState = new CompactionState(env, pruneTable, pruneFlushInterval);
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
-                      + pruneTable);
+          LOG.debug(String.format("Automatic invalid list pruning is enabled for table %s. Compaction state " +
+                                    "will be recorded in table %s",
+                                  env.getRegionInfo().getTable().getNameWithNamespaceInclAsString(),
+                                  pruneTable.getNameWithNamespaceInclAsString()));
         }
       }
     }
   }
 
+  /**
+   * Stop and clear state related to pruning.
+   */
+  protected void resetPruneState() {
+    pruneEnable = false;
+    if (compactionState != null) {
+      compactionState.stop();
+      compactionState = null;
+    }
+  }
+
   private long numStoreFilesForRegion(ObserverContext<RegionCoprocessorEnvironment> c) {
     long numStoreFiles = 0;
     for (Store store : c.getEnvironment().getRegion().getStores().values()) {

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/203825e2/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
index 9773a15..6bd8bab 100644
--- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -22,9 +22,11 @@ import com.google.common.util.concurrent.AbstractIdleService;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
 
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 import java.util.Map;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.TimeUnit;
@@ -45,6 +47,7 @@ public class PruneUpperBoundWriter extends AbstractIdleService {
   private final ConcurrentSkipListMap<byte[], Long> emptyRegions;
 
   private volatile Thread flushThread;
+  private volatile boolean stopped;
 
   private long lastChecked;
 
@@ -87,6 +90,7 @@ public class PruneUpperBoundWriter extends AbstractIdleService {
   @Override
   protected void shutDown() throws Exception {
     LOG.info("Stopping PruneUpperBoundWriter Thread.");
+    stopped = true;
     if (flushThread != null) {
       flushThread.interrupt();
       flushThread.join(TimeUnit.SECONDS.toMillis(1));
@@ -97,27 +101,33 @@ public class PruneUpperBoundWriter extends AbstractIdleService {
     flushThread = new Thread("tephra-prune-upper-bound-writer") {
       @Override
       public void run() {
-        while ((!isInterrupted()) && isRunning()) {
+        while ((!isInterrupted()) && (!stopped)) {
           long now = System.currentTimeMillis();
           if (now > (lastChecked + pruneFlushInterval)) {
             // should flush data
             try {
-              // Record prune upper bound
-              while (!pruneEntries.isEmpty()) {
-                Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
-                dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
-                // We can now remove the entry only if the key and value match with what we wrote since it is
-                // possible that a new pruneUpperBound for the same key has been added
-                pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
-              }
-              // Record empty regions
-              while (!emptyRegions.isEmpty()) {
-                Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry();
-                dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey());
-                // We can now remove the entry only if the key and value match with what we wrote since it is
-                // possible that a new value for the same key has been added
-                emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue());
-              }
+              User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
+                @Override
+                public Void run() throws Exception {
+                  // Record prune upper bound
+                  while (!pruneEntries.isEmpty()) {
+                    Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
+                    dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
+                    // We can now remove the entry only if the key and value match with what we wrote since it is
+                    // possible that a new pruneUpperBound for the same key has been added
+                    pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
+                  }
+                  // Record empty regions
+                  while (!emptyRegions.isEmpty()) {
+                    Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry();
+                    dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey());
+                    // We can now remove the entry only if the key and value match with what we wrote since it is
+                    // possible that a new value for the same key has been added
+                    emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue());
+                  }
+                  return null;
+                }
+              });
             } catch (IOException ex) {
               LOG.warn("Cannot record prune upper bound for a region to table " +
                          tableName.getNameWithNamespaceInclAsString(), ex);

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/203825e2/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 7485b91..f9bb35e 100644
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -107,12 +107,13 @@ public class TransactionProcessor extends BaseRegionObserver {
 
   private final TransactionCodec txCodec;
   private TransactionStateCache cache;
-  private CompactionState compactionState;
+  private volatile CompactionState compactionState;
+
+  protected volatile Boolean pruneEnable;
+  protected volatile Long txMaxLifetimeMillis;
   protected Map<byte[], Long> ttlByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
   protected boolean allowEmptyValues = TxConstants.ALLOW_EMPTY_VALUES_DEFAULT;
   protected boolean readNonTxnData = TxConstants.DEFAULT_READ_NON_TX_DATA;
-  protected Long txMaxLifetimeMillis;
-  private Boolean pruneEnable;
 
   public TransactionProcessor() {
     this.txCodec = new TransactionCodec();
@@ -144,10 +145,12 @@ public class TransactionProcessor extends BaseRegionObserver {
       }
 
       this.allowEmptyValues = getAllowEmptyValues(env, tableDesc);
+      this.txMaxLifetimeMillis = getTxMaxLifetimeMillis(env);
       this.readNonTxnData = Boolean.valueOf(tableDesc.getValue(TxConstants.READ_NON_TX_DATA));
       if (readNonTxnData) {
         LOG.info("Reading pre-existing data enabled for table " + tableDesc.getNameAsString());
       }
+      initializePruneState(env);
     }
   }
 
@@ -171,9 +174,7 @@ public class TransactionProcessor extends BaseRegionObserver {
 
   @Override
   public void stop(CoprocessorEnvironment e) throws IOException {
-    if (compactionState != null) {
-      compactionState.stop();
-    }
+    resetPruneState();
   }
 
   @Override
@@ -248,6 +249,15 @@ public class TransactionProcessor extends BaseRegionObserver {
       Boolean.valueOf(allowEmptyValuesFromTableDesc) : allowEmptyValuesFromConfig;
   }
 
+  private long getTxMaxLifetimeMillis(RegionCoprocessorEnvironment env) {
+    Configuration conf = getConfiguration(env);
+    if (conf != null) {
+      return TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME,
+                                                   TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME));
+    }
+    return TimeUnit.SECONDS.toMillis(TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME);
+  }
+
   private boolean isFamilyDelete(List<Cell> familyCells) {
     return familyCells.size() == 1 && CellUtil.isDeleteFamily(familyCells.get(0));
   }
@@ -321,15 +331,10 @@ public class TransactionProcessor extends BaseRegionObserver {
     LOG.debug(String.format("Region %s: memstore size = %s, num store files = %s",
                             region.getRegionInfo().getRegionNameAsString(), memstoreSize, numStoreFiles));
     if (memstoreSize == 0 && numStoreFiles == 0) {
-      if (pruneEnable == null) {
-        initPruneState(e);
-      }
-
-      if (Boolean.TRUE.equals(pruneEnable)) {
+      if (compactionState != null) {
         compactionState.persistRegionEmpty(System.currentTimeMillis());
       }
     }
-
   }
 
   @Override
@@ -340,12 +345,8 @@ public class TransactionProcessor extends BaseRegionObserver {
     // Get the latest tx snapshot state for the compaction
     TransactionVisibilityState snapshot = cache.getLatestState();
 
-    if (pruneEnable == null) {
-      initPruneState(c);
-    }
-
-    if (Boolean.TRUE.equals(pruneEnable)) {
-      // Record tx state before the compaction
+    // Record tx state before the compaction
+    if (compactionState != null) {
       compactionState.record(request, snapshot);
     }
 
@@ -356,7 +357,7 @@ public class TransactionProcessor extends BaseRegionObserver {
   @Override
   public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, StoreFile resultFile,
                           CompactionRequest request) throws IOException {
-    // Persist the compaction state after a succesful compaction
+    // Persist the compaction state after a successful compaction
     if (compactionState != null) {
       compactionState.persist();
     }
@@ -416,21 +417,8 @@ public class TransactionProcessor extends BaseRegionObserver {
       return;
     }
 
-    if (txMaxLifetimeMillis == null) {
-      Configuration conf = getConfiguration(env);
-      // Configuration won't be null in TransactionProcessor but the derived classes might return
-      // null if it is not available temporarily
-      if (conf != null) {
-        this.txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME,
-                                                                         TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME));
-      } else {
-        throw new IOException(String.format("Could not validate Transaction since the value of max lifetime is " +
-                                              "unavailable. Please retry the operation."));
-      }
-    }
-
     boolean validLifetime =
-      TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis > System.currentTimeMillis();
+      (TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis) > System.currentTimeMillis();
     if (!validLifetime) {
       throw new DoNotRetryIOException(String.format("Transaction %s has exceeded max lifetime %s ms",
                                                     tx.getTransactionId(), txMaxLifetimeMillis));
@@ -454,28 +442,47 @@ public class TransactionProcessor extends BaseRegionObserver {
     return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter);
   }
 
-  private void initPruneState(ObserverContext<RegionCoprocessorEnvironment> c) {
-    Configuration conf = getConfiguration(c.getEnvironment());
-    // Configuration won't be null in TransactionProcessor but the derived classes might return
-    // null if it is not available temporarily
+  /**
+   * Refresh the properties related to transaction pruning. This method needs to be invoked if there is change in the
+   * prune related properties after clearing the state by calling {@link #resetPruneState}.
+   *
+   * @param env {@link RegionCoprocessorEnvironment} of this region
+   */
+  protected void initializePruneState(RegionCoprocessorEnvironment env) {
+    Configuration conf = getConfiguration(env);
     if (conf != null) {
       pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
                                     TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
+
       if (Boolean.TRUE.equals(pruneEnable)) {
-        String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
-                                     TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
-        long pruneFlushInterval = TimeUnit.SECONDS.toMillis(
-          conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
-                       TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
-        compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
+        TableName pruneTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+                                                          TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
+        long pruneFlushInterval = TimeUnit.SECONDS.toMillis(conf.getLong(
+          TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
+          TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
+
+        compactionState = new CompactionState(env, pruneTable, pruneFlushInterval);
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
-                      + pruneTable);
+          LOG.debug(String.format("Automatic invalid list pruning is enabled for table %s:%s. Compaction state will " +
+                                    "be recorded in table %s:%s", env.getRegionInfo().getTable().getNamespaceAsString(),
+                                  env.getRegionInfo().getTable().getNameAsString(), pruneTable.getNamespaceAsString(),
+                                  pruneTable.getNameAsString()));
         }
       }
     }
   }
 
+  /**
+   * Stop and clear state related to pruning.
+   */
+  protected void resetPruneState() {
+    pruneEnable = false;
+    if (compactionState != null) {
+      compactionState.stop();
+      compactionState = null;
+    }
+  }
+
   private long numStoreFilesForRegion(ObserverContext<RegionCoprocessorEnvironment> c) {
     long numStoreFiles = 0;
     for (Store store : c.getEnvironment().getRegion().getStores().values()) {

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/203825e2/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
index beed1ad..38c1a6f 100644
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -22,9 +22,11 @@ import com.google.common.util.concurrent.AbstractIdleService;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
 
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 import java.util.Map;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.TimeUnit;
@@ -45,6 +47,7 @@ public class PruneUpperBoundWriter extends AbstractIdleService {
   private final ConcurrentSkipListMap<byte[], Long> emptyRegions;
 
   private volatile Thread flushThread;
+  private volatile boolean stopped;
 
   private long lastChecked;
 
@@ -87,6 +90,7 @@ public class PruneUpperBoundWriter extends AbstractIdleService {
   @Override
   protected void shutDown() throws Exception {
     LOG.info("Stopping PruneUpperBoundWriter Thread.");
+    stopped = true;
     if (flushThread != null) {
       flushThread.interrupt();
       flushThread.join(TimeUnit.SECONDS.toMillis(1));
@@ -97,27 +101,33 @@ public class PruneUpperBoundWriter extends AbstractIdleService {
     flushThread = new Thread("tephra-prune-upper-bound-writer") {
       @Override
       public void run() {
-        while ((!isInterrupted()) && isRunning()) {
+        while ((!isInterrupted()) && (!stopped)) {
           long now = System.currentTimeMillis();
           if (now > (lastChecked + pruneFlushInterval)) {
             // should flush data
             try {
-              // Record prune upper bound
-              while (!pruneEntries.isEmpty()) {
-                Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
-                dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
-                // We can now remove the entry only if the key and value match with what we wrote since it is
-                // possible that a new pruneUpperBound for the same key has been added
-                pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
-              }
-              // Record empty regions
-              while (!emptyRegions.isEmpty()) {
-                Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry();
-                dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey());
-                // We can now remove the entry only if the key and value match with what we wrote since it is
-                // possible that a new value for the same key has been added
-                emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue());
-              }
+              User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
+                @Override
+                public Void run() throws Exception {
+                  // Record prune upper bound
+                  while (!pruneEntries.isEmpty()) {
+                    Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
+                    dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
+                    // We can now remove the entry only if the key and value match with what we wrote since it is
+                    // possible that a new pruneUpperBound for the same key has been added
+                    pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
+                  }
+                  // Record empty regions
+                  while (!emptyRegions.isEmpty()) {
+                    Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry();
+                    dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey());
+                    // We can now remove the entry only if the key and value match with what we wrote since it is
+                    // possible that a new value for the same key has been added
+                    emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue());
+                  }
+                  return null;
+                }
+              });
             } catch (IOException ex) {
               LOG.warn("Cannot record prune upper bound for a region to table " +
                          tableName.getNamespaceAsString() + ":" + tableName.getNameAsString(), ex);

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/203825e2/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 5e1b4c5..02e2dac 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -107,12 +107,13 @@ public class TransactionProcessor extends BaseRegionObserver {
 
   private final TransactionCodec txCodec;
   private TransactionStateCache cache;
-  private CompactionState compactionState;
+  private volatile CompactionState compactionState;
+
+  protected volatile Boolean pruneEnable;
+  protected volatile Long txMaxLifetimeMillis;
   protected Map<byte[], Long> ttlByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
   protected boolean allowEmptyValues = TxConstants.ALLOW_EMPTY_VALUES_DEFAULT;
   protected boolean readNonTxnData = TxConstants.DEFAULT_READ_NON_TX_DATA;
-  protected Long txMaxLifetimeMillis;
-  private Boolean pruneEnable;
 
   public TransactionProcessor() {
     this.txCodec = new TransactionCodec();
@@ -144,10 +145,12 @@ public class TransactionProcessor extends BaseRegionObserver {
       }
 
       this.allowEmptyValues = getAllowEmptyValues(env, tableDesc);
+      this.txMaxLifetimeMillis = getTxMaxLifetimeMillis(env);
       this.readNonTxnData = Boolean.valueOf(tableDesc.getValue(TxConstants.READ_NON_TX_DATA));
       if (readNonTxnData) {
         LOG.info("Reading pre-existing data enabled for table " + tableDesc.getNameAsString());
       }
+      initializePruneState(env);
     }
   }
 
@@ -171,9 +174,7 @@ public class TransactionProcessor extends BaseRegionObserver {
 
   @Override
   public void stop(CoprocessorEnvironment e) throws IOException {
-    if (compactionState != null) {
-      compactionState.stop();
-    }
+    resetPruneState();
   }
 
   @Override
@@ -248,6 +249,15 @@ public class TransactionProcessor extends BaseRegionObserver {
       Boolean.valueOf(allowEmptyValuesFromTableDesc) : allowEmptyValuesFromConfig;
   }
 
+  private long getTxMaxLifetimeMillis(RegionCoprocessorEnvironment env) {
+    Configuration conf = getConfiguration(env);
+    if (conf != null) {
+      return TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME,
+                                                   TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME));
+    }
+    return TimeUnit.SECONDS.toMillis(TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME);
+  }
+
   private boolean isFamilyDelete(List<Cell> familyCells) {
     return familyCells.size() == 1 && CellUtil.isDeleteFamily(familyCells.get(0));
   }
@@ -321,15 +331,10 @@ public class TransactionProcessor extends BaseRegionObserver {
     LOG.debug(String.format("Region %s: memstore size = %s, num store files = %s",
                             region.getRegionInfo().getRegionNameAsString(), memstoreSize, numStoreFiles));
     if (memstoreSize == 0 && numStoreFiles == 0) {
-      if (pruneEnable == null) {
-        initPruneState(e);
-      }
-
-      if (Boolean.TRUE.equals(pruneEnable)) {
+      if (compactionState != null) {
         compactionState.persistRegionEmpty(System.currentTimeMillis());
       }
     }
-
   }
 
   @Override
@@ -340,12 +345,8 @@ public class TransactionProcessor extends BaseRegionObserver {
     // Get the latest tx snapshot state for the compaction
     TransactionVisibilityState snapshot = cache.getLatestState();
 
-    if (pruneEnable == null) {
-      initPruneState(c);
-    }
-
-    if (Boolean.TRUE.equals(pruneEnable)) {
-      // Record tx state before the compaction
+    // Record tx state before the compaction
+    if (compactionState != null) {
       compactionState.record(request, snapshot);
     }
 
@@ -356,7 +357,7 @@ public class TransactionProcessor extends BaseRegionObserver {
   @Override
   public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, StoreFile resultFile,
                           CompactionRequest request) throws IOException {
-    // Persist the compaction state after a succesful compaction
+    // Persist the compaction state after a successful compaction
     if (compactionState != null) {
       compactionState.persist();
     }
@@ -416,21 +417,8 @@ public class TransactionProcessor extends BaseRegionObserver {
       return;
     }
 
-    if (txMaxLifetimeMillis == null) {
-      Configuration conf = getConfiguration(env);
-      // Configuration won't be null in TransactionProcessor but the derived classes might return
-      // null if it is not available temporarily
-      if (conf != null) {
-        this.txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME,
-                                                                         TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME));
-      } else {
-        throw new IOException(String.format("Could not validate Transaction since the value of max lifetime is " +
-                                              "unavailable. Please retry the operation."));
-      }
-    }
-
     boolean validLifetime =
-      TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis > System.currentTimeMillis();
+      (TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis) > System.currentTimeMillis();
     if (!validLifetime) {
       throw new DoNotRetryIOException(String.format("Transaction %s has exceeded max lifetime %s ms",
                                                     tx.getTransactionId(), txMaxLifetimeMillis));
@@ -454,28 +442,47 @@ public class TransactionProcessor extends BaseRegionObserver {
     return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter);
   }
 
-  private void initPruneState(ObserverContext<RegionCoprocessorEnvironment> c) {
-    Configuration conf = getConfiguration(c.getEnvironment());
-    // Configuration won't be null in TransactionProcessor but the derived classes might return
-    // null if it is not available temporarily
+  /**
+   * Refresh the properties related to transaction pruning. This method needs to be invoked if there is change in the
+   * prune related properties after clearing the state by calling {@link #resetPruneState}.
+   *
+   * @param env {@link RegionCoprocessorEnvironment} of this region
+   */
+  protected void initializePruneState(RegionCoprocessorEnvironment env) {
+    Configuration conf = getConfiguration(env);
     if (conf != null) {
       pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
                                     TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
+
       if (Boolean.TRUE.equals(pruneEnable)) {
-        String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
-                                     TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
-        long pruneFlushInterval = TimeUnit.SECONDS.toMillis(
-          conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
-                       TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
-        compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
+        TableName pruneTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+                                                          TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
+        long pruneFlushInterval = TimeUnit.SECONDS.toMillis(conf.getLong(
+          TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
+          TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
+
+        compactionState = new CompactionState(env, pruneTable, pruneFlushInterval);
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
-                      + pruneTable);
+          LOG.debug(String.format("Automatic invalid list pruning is enabled for table %s. Compaction state " +
+                                    "will be recorded in table %s",
+                                  env.getRegionInfo().getTable().getNameWithNamespaceInclAsString(),
+                                  pruneTable.getNameWithNamespaceInclAsString()));
         }
       }
     }
   }
 
+  /**
+   * Stop and clear state related to pruning.
+   */
+  protected void resetPruneState() {
+    pruneEnable = false;
+    if (compactionState != null) {
+      compactionState.stop();
+      compactionState = null;
+    }
+  }
+
   private long numStoreFilesForRegion(ObserverContext<RegionCoprocessorEnvironment> c) {
     long numStoreFiles = 0;
     for (Store store : c.getEnvironment().getRegion().getStores()) {

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/203825e2/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
index 9773a15..6bd8bab 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -22,9 +22,11 @@ import com.google.common.util.concurrent.AbstractIdleService;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
 
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 import java.util.Map;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.TimeUnit;
@@ -45,6 +47,7 @@ public class PruneUpperBoundWriter extends AbstractIdleService {
   private final ConcurrentSkipListMap<byte[], Long> emptyRegions;
 
   private volatile Thread flushThread;
+  private volatile boolean stopped;
 
   private long lastChecked;
 
@@ -87,6 +90,7 @@ public class PruneUpperBoundWriter extends AbstractIdleService {
   @Override
   protected void shutDown() throws Exception {
     LOG.info("Stopping PruneUpperBoundWriter Thread.");
+    stopped = true;
     if (flushThread != null) {
       flushThread.interrupt();
       flushThread.join(TimeUnit.SECONDS.toMillis(1));
@@ -97,27 +101,33 @@ public class PruneUpperBoundWriter extends AbstractIdleService {
     flushThread = new Thread("tephra-prune-upper-bound-writer") {
       @Override
       public void run() {
-        while ((!isInterrupted()) && isRunning()) {
+        while ((!isInterrupted()) && (!stopped)) {
           long now = System.currentTimeMillis();
           if (now > (lastChecked + pruneFlushInterval)) {
             // should flush data
             try {
-              // Record prune upper bound
-              while (!pruneEntries.isEmpty()) {
-                Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
-                dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
-                // We can now remove the entry only if the key and value match with what we wrote since it is
-                // possible that a new pruneUpperBound for the same key has been added
-                pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
-              }
-              // Record empty regions
-              while (!emptyRegions.isEmpty()) {
-                Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry();
-                dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey());
-                // We can now remove the entry only if the key and value match with what we wrote since it is
-                // possible that a new value for the same key has been added
-                emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue());
-              }
+              User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
+                @Override
+                public Void run() throws Exception {
+                  // Record prune upper bound
+                  while (!pruneEntries.isEmpty()) {
+                    Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
+                    dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
+                    // We can now remove the entry only if the key and value match with what we wrote since it is
+                    // possible that a new pruneUpperBound for the same key has been added
+                    pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
+                  }
+                  // Record empty regions
+                  while (!emptyRegions.isEmpty()) {
+                    Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry();
+                    dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey());
+                    // We can now remove the entry only if the key and value match with what we wrote since it is
+                    // possible that a new value for the same key has been added
+                    emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue());
+                  }
+                  return null;
+                }
+              });
             } catch (IOException ex) {
               LOG.warn("Cannot record prune upper bound for a region to table " +
                          tableName.getNameWithNamespaceInclAsString(), ex);