You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by go...@apache.org on 2017/02/14 05:40:28 UTC
incubator-tephra git commit: TEPHRA-219 execute cross region calls in
coprocessor as the login user.
Repository: incubator-tephra
Updated Branches:
refs/heads/master 2af5ac2bd -> 203825e29
TEPHRA-219 execute cross region calls in coprocessor as the login user.
This closes #35 from GitHub.
Make pruneEnable and txMaxLifetimeMillis volatile so that derived
classes can make use of it.
Introduced stopped variable in PruneUpperBoundWriter.
Signed-off-by: Gokul Gunasekaran <go...@cask.co>
Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/203825e2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/203825e2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/203825e2
Branch: refs/heads/master
Commit: 203825e29af43b24029ef83dc512a4b8a3deeadc
Parents: 2af5ac2
Author: Gokul Gunasekaran <go...@cask.co>
Authored: Sat Feb 11 19:42:02 2017 -0800
Committer: Gokul Gunasekaran <go...@cask.co>
Committed: Mon Feb 13 21:40:11 2017 -0800
----------------------------------------------------------------------
.../hbase/coprocessor/TransactionProcessor.java | 95 ++++++++++---------
.../hbase/txprune/PruneUpperBoundWriter.java | 58 ++++++++----
.../hbase/coprocessor/TransactionProcessor.java | 97 +++++++++++---------
.../hbase/txprune/PruneUpperBoundWriter.java | 44 +++++----
.../hbase/coprocessor/TransactionProcessor.java | 97 +++++++++++---------
.../hbase/txprune/PruneUpperBoundWriter.java | 44 +++++----
.../hbase/coprocessor/TransactionProcessor.java | 97 +++++++++++---------
.../hbase/txprune/PruneUpperBoundWriter.java | 44 +++++----
.../hbase/coprocessor/TransactionProcessor.java | 97 +++++++++++---------
.../hbase/txprune/PruneUpperBoundWriter.java | 44 +++++----
10 files changed, 405 insertions(+), 312 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/203825e2/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 9ff4d3b..d2402a6 100644
--- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -107,12 +107,13 @@ public class TransactionProcessor extends BaseRegionObserver {
private final TransactionCodec txCodec;
private TransactionStateCache cache;
- private CompactionState compactionState;
+ private volatile CompactionState compactionState;
+
+ protected volatile Boolean pruneEnable;
+ protected volatile Long txMaxLifetimeMillis;
protected Map<byte[], Long> ttlByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
protected boolean allowEmptyValues = TxConstants.ALLOW_EMPTY_VALUES_DEFAULT;
protected boolean readNonTxnData = TxConstants.DEFAULT_READ_NON_TX_DATA;
- protected Long txMaxLifetimeMillis;
- private Boolean pruneEnable;
public TransactionProcessor() {
this.txCodec = new TransactionCodec();
@@ -144,10 +145,12 @@ public class TransactionProcessor extends BaseRegionObserver {
}
this.allowEmptyValues = getAllowEmptyValues(env, tableDesc);
+ this.txMaxLifetimeMillis = getTxMaxLifetimeMillis(env);
this.readNonTxnData = Boolean.valueOf(tableDesc.getValue(TxConstants.READ_NON_TX_DATA));
if (readNonTxnData) {
LOG.info("Reading pre-existing data enabled for table " + tableDesc.getNameAsString());
}
+ initializePruneState(env);
}
}
@@ -171,9 +174,7 @@ public class TransactionProcessor extends BaseRegionObserver {
@Override
public void stop(CoprocessorEnvironment e) throws IOException {
- if (compactionState != null) {
- compactionState.stop();
- }
+ resetPruneState();
}
@Override
@@ -248,6 +249,15 @@ public class TransactionProcessor extends BaseRegionObserver {
Boolean.valueOf(allowEmptyValuesFromTableDesc) : allowEmptyValuesFromConfig;
}
+ private long getTxMaxLifetimeMillis(RegionCoprocessorEnvironment env) {
+ Configuration conf = getConfiguration(env);
+ if (conf != null) {
+ return TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME,
+ TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME));
+ }
+ return TimeUnit.SECONDS.toMillis(TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME);
+ }
+
private boolean isFamilyDelete(List<Cell> familyCells) {
return familyCells.size() == 1 && CellUtil.isDeleteFamily(familyCells.get(0));
}
@@ -321,15 +331,10 @@ public class TransactionProcessor extends BaseRegionObserver {
LOG.debug(String.format("Region %s: memstore size = %s, num store files = %s",
region.getRegionInfo().getRegionNameAsString(), memstoreSize, numStoreFiles));
if (memstoreSize == 0 && numStoreFiles == 0) {
- if (pruneEnable == null) {
- initPruneState(e);
- }
-
- if (Boolean.TRUE.equals(pruneEnable)) {
+ if (compactionState != null) {
compactionState.persistRegionEmpty(System.currentTimeMillis());
}
}
-
}
@Override
@@ -340,12 +345,8 @@ public class TransactionProcessor extends BaseRegionObserver {
// Get the latest tx snapshot state for the compaction
TransactionVisibilityState snapshot = cache.getLatestState();
- if (pruneEnable == null) {
- initPruneState(c);
- }
-
- if (Boolean.TRUE.equals(pruneEnable)) {
- // Record tx state before the compaction
+ // Record tx state before the compaction
+ if (compactionState != null) {
compactionState.record(request, snapshot);
}
@@ -416,21 +417,8 @@ public class TransactionProcessor extends BaseRegionObserver {
return;
}
- if (txMaxLifetimeMillis == null) {
- Configuration conf = getConfiguration(env);
- // Configuration won't be null in TransactionProcessor but the derived classes might return
- // null if it is not available temporarily
- if (conf != null) {
- this.txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME,
- TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME));
- } else {
- throw new IOException(String.format("Could not validate Transaction since the value of max lifetime is " +
- "unavailable. Please retry the operation."));
- }
- }
-
boolean validLifetime =
- TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis > System.currentTimeMillis();
+ (TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis) > System.currentTimeMillis();
if (!validLifetime) {
throw new DoNotRetryIOException(String.format("Transaction %s has exceeded max lifetime %s ms",
tx.getTransactionId(), txMaxLifetimeMillis));
@@ -454,28 +442,47 @@ public class TransactionProcessor extends BaseRegionObserver {
return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter);
}
- private void initPruneState(ObserverContext<RegionCoprocessorEnvironment> c) {
- Configuration conf = getConfiguration(c.getEnvironment());
- // Configuration won't be null in TransactionProcessor but the derived classes might return
- // null if it is not available temporarily
+ /**
+ * Refresh the properties related to transaction pruning. This method needs to be invoked if there is change in the
+ * prune related properties after clearing the state by calling {@link #resetPruneState}.
+ *
+ * @param env {@link RegionCoprocessorEnvironment} of this region
+ */
+ protected void initializePruneState(RegionCoprocessorEnvironment env) {
+ Configuration conf = getConfiguration(env);
if (conf != null) {
pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
+
if (Boolean.TRUE.equals(pruneEnable)) {
- String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
- TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
- long pruneFlushInterval = TimeUnit.SECONDS.toMillis(
- conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
- TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
- compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
+ TableName pruneTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
+ long pruneFlushInterval = TimeUnit.SECONDS.toMillis(conf.getLong(
+ TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
+
+ compactionState = new CompactionState(env, pruneTable, pruneFlushInterval);
if (LOG.isDebugEnabled()) {
- LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
- + pruneTable);
+ TableName name = env.getRegion().getRegionInfo().getTable();
+ LOG.debug(String.format("Automatic invalid list pruning is enabled for table %s:%s. Compaction state will " +
+ "be recorded in table %s:%s", name.getNamespaceAsString(), name.getNameAsString(),
+ pruneTable.getNamespaceAsString(), pruneTable.getNameAsString()));
}
}
}
}
+ /**
+ * Stop and clear state related to pruning.
+ */
+ protected void resetPruneState() {
+ pruneEnable = false;
+ if (compactionState != null) {
+ compactionState.stop();
+ compactionState = null;
+ }
+ }
+
private long numStoreFilesForRegion(ObserverContext<RegionCoprocessorEnvironment> c) {
long numStoreFiles = 0;
for (Store store : c.getEnvironment().getRegion().getStores().values()) {
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/203825e2/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
index beed1ad..7e4a0fa 100644
--- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -23,8 +23,10 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.security.UserGroupInformation;
import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
@@ -45,6 +47,7 @@ public class PruneUpperBoundWriter extends AbstractIdleService {
private final ConcurrentSkipListMap<byte[], Long> emptyRegions;
private volatile Thread flushThread;
+ private volatile boolean stopped;
private long lastChecked;
@@ -87,6 +90,7 @@ public class PruneUpperBoundWriter extends AbstractIdleService {
@Override
protected void shutDown() throws Exception {
LOG.info("Stopping PruneUpperBoundWriter Thread.");
+ stopped = true;
if (flushThread != null) {
flushThread.interrupt();
flushThread.join(TimeUnit.SECONDS.toMillis(1));
@@ -97,30 +101,36 @@ public class PruneUpperBoundWriter extends AbstractIdleService {
flushThread = new Thread("tephra-prune-upper-bound-writer") {
@Override
public void run() {
- while ((!isInterrupted()) && isRunning()) {
+ while ((!isInterrupted()) && (!stopped)) {
long now = System.currentTimeMillis();
if (now > (lastChecked + pruneFlushInterval)) {
// should flush data
try {
- // Record prune upper bound
- while (!pruneEntries.isEmpty()) {
- Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
- dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
- // We can now remove the entry only if the key and value match with what we wrote since it is
- // possible that a new pruneUpperBound for the same key has been added
- pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
- }
- // Record empty regions
- while (!emptyRegions.isEmpty()) {
- Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry();
- dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey());
- // We can now remove the entry only if the key and value match with what we wrote since it is
- // possible that a new value for the same key has been added
- emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue());
- }
- } catch (IOException ex) {
- LOG.warn("Cannot record prune upper bound for a region to table " +
- tableName.getNamespaceAsString() + ":" + tableName.getNameAsString(), ex);
+ UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ // Record prune upper bound
+ while (!pruneEntries.isEmpty()) {
+ Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
+ dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
+ // We can now remove the entry only if the key and value match with what we wrote since it is
+ // possible that a new pruneUpperBound for the same key has been added
+ pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
+ }
+ // Record empty regions
+ while (!emptyRegions.isEmpty()) {
+ Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry();
+ dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey());
+ // We can now remove the entry only if the key and value match with what we wrote since it is
+ // possible that a new value for the same key has been added
+ emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue());
+ }
+ return null;
+ }
+ });
+ } catch (IOException | InterruptedException ex) {
+ // Handle any exception that might be thrown during HBase operation
+ handleException(ex);
}
lastChecked = now;
}
@@ -147,4 +157,12 @@ public class PruneUpperBoundWriter extends AbstractIdleService {
Bytes.toStringBinary(regionName), isRunning() ? "alive" : "running"));
}
}
+
+ private void handleException(Exception ex) {
+ LOG.warn("Cannot record prune upper bound for a region to table " +
+ tableName.getNamespaceAsString() + ":" + tableName.getNameAsString(), ex);
+ if (ex instanceof IOException) {
+ Thread.currentThread().interrupt();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/203825e2/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 7485b91..84776cf 100644
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -107,12 +107,13 @@ public class TransactionProcessor extends BaseRegionObserver {
private final TransactionCodec txCodec;
private TransactionStateCache cache;
- private CompactionState compactionState;
+ private volatile CompactionState compactionState;
+
+ protected volatile Boolean pruneEnable;
+ protected volatile Long txMaxLifetimeMillis;
protected Map<byte[], Long> ttlByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
protected boolean allowEmptyValues = TxConstants.ALLOW_EMPTY_VALUES_DEFAULT;
protected boolean readNonTxnData = TxConstants.DEFAULT_READ_NON_TX_DATA;
- protected Long txMaxLifetimeMillis;
- private Boolean pruneEnable;
public TransactionProcessor() {
this.txCodec = new TransactionCodec();
@@ -144,10 +145,12 @@ public class TransactionProcessor extends BaseRegionObserver {
}
this.allowEmptyValues = getAllowEmptyValues(env, tableDesc);
+ this.txMaxLifetimeMillis = getTxMaxLifetimeMillis(env);
this.readNonTxnData = Boolean.valueOf(tableDesc.getValue(TxConstants.READ_NON_TX_DATA));
if (readNonTxnData) {
LOG.info("Reading pre-existing data enabled for table " + tableDesc.getNameAsString());
}
+ initializePruneState(env);
}
}
@@ -171,9 +174,7 @@ public class TransactionProcessor extends BaseRegionObserver {
@Override
public void stop(CoprocessorEnvironment e) throws IOException {
- if (compactionState != null) {
- compactionState.stop();
- }
+ resetPruneState();
}
@Override
@@ -248,6 +249,15 @@ public class TransactionProcessor extends BaseRegionObserver {
Boolean.valueOf(allowEmptyValuesFromTableDesc) : allowEmptyValuesFromConfig;
}
+ private long getTxMaxLifetimeMillis(RegionCoprocessorEnvironment env) {
+ Configuration conf = getConfiguration(env);
+ if (conf != null) {
+ return TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME,
+ TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME));
+ }
+ return TimeUnit.SECONDS.toMillis(TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME);
+ }
+
private boolean isFamilyDelete(List<Cell> familyCells) {
return familyCells.size() == 1 && CellUtil.isDeleteFamily(familyCells.get(0));
}
@@ -321,15 +331,10 @@ public class TransactionProcessor extends BaseRegionObserver {
LOG.debug(String.format("Region %s: memstore size = %s, num store files = %s",
region.getRegionInfo().getRegionNameAsString(), memstoreSize, numStoreFiles));
if (memstoreSize == 0 && numStoreFiles == 0) {
- if (pruneEnable == null) {
- initPruneState(e);
- }
-
- if (Boolean.TRUE.equals(pruneEnable)) {
+ if (compactionState != null) {
compactionState.persistRegionEmpty(System.currentTimeMillis());
}
}
-
}
@Override
@@ -340,12 +345,8 @@ public class TransactionProcessor extends BaseRegionObserver {
// Get the latest tx snapshot state for the compaction
TransactionVisibilityState snapshot = cache.getLatestState();
- if (pruneEnable == null) {
- initPruneState(c);
- }
-
- if (Boolean.TRUE.equals(pruneEnable)) {
- // Record tx state before the compaction
+ // Record tx state before the compaction
+ if (compactionState != null) {
compactionState.record(request, snapshot);
}
@@ -356,7 +357,7 @@ public class TransactionProcessor extends BaseRegionObserver {
@Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, StoreFile resultFile,
CompactionRequest request) throws IOException {
- // Persist the compaction state after a succesful compaction
+ // Persist the compaction state after a successful compaction
if (compactionState != null) {
compactionState.persist();
}
@@ -416,21 +417,8 @@ public class TransactionProcessor extends BaseRegionObserver {
return;
}
- if (txMaxLifetimeMillis == null) {
- Configuration conf = getConfiguration(env);
- // Configuration won't be null in TransactionProcessor but the derived classes might return
- // null if it is not available temporarily
- if (conf != null) {
- this.txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME,
- TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME));
- } else {
- throw new IOException(String.format("Could not validate Transaction since the value of max lifetime is " +
- "unavailable. Please retry the operation."));
- }
- }
-
boolean validLifetime =
- TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis > System.currentTimeMillis();
+ (TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis) > System.currentTimeMillis();
if (!validLifetime) {
throw new DoNotRetryIOException(String.format("Transaction %s has exceeded max lifetime %s ms",
tx.getTransactionId(), txMaxLifetimeMillis));
@@ -454,28 +442,47 @@ public class TransactionProcessor extends BaseRegionObserver {
return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter);
}
- private void initPruneState(ObserverContext<RegionCoprocessorEnvironment> c) {
- Configuration conf = getConfiguration(c.getEnvironment());
- // Configuration won't be null in TransactionProcessor but the derived classes might return
- // null if it is not available temporarily
+ /**
+ * Refresh the properties related to transaction pruning. This method needs to be invoked if there is change in the
+ * prune related properties after clearing the state by calling {@link #resetPruneState}.
+ *
+ * @param env {@link RegionCoprocessorEnvironment} of this region
+ */
+ protected void initializePruneState(RegionCoprocessorEnvironment env) {
+ Configuration conf = getConfiguration(env);
if (conf != null) {
pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
+
if (Boolean.TRUE.equals(pruneEnable)) {
- String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
- TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
- long pruneFlushInterval = TimeUnit.SECONDS.toMillis(
- conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
- TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
- compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
+ TableName pruneTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
+ long pruneFlushInterval = TimeUnit.SECONDS.toMillis(conf.getLong(
+ TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
+
+ compactionState = new CompactionState(env, pruneTable, pruneFlushInterval);
if (LOG.isDebugEnabled()) {
- LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
- + pruneTable);
+ TableName name = env.getRegion().getRegionInfo().getTable();
+ LOG.debug(String.format("Automatic invalid list pruning is enabled for table %s:%s. Compaction state will " +
+ "be recorded in table %s:%s", name.getNamespaceAsString(), name.getNameAsString(),
+ pruneTable.getNamespaceAsString(), pruneTable.getNameAsString()));
}
}
}
}
+ /**
+ * Stop and clear state related to pruning.
+ */
+ protected void resetPruneState() {
+ pruneEnable = false;
+ if (compactionState != null) {
+ compactionState.stop();
+ compactionState = null;
+ }
+ }
+
private long numStoreFilesForRegion(ObserverContext<RegionCoprocessorEnvironment> c) {
long numStoreFiles = 0;
for (Store store : c.getEnvironment().getRegion().getStores().values()) {
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/203825e2/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
index beed1ad..38c1a6f 100644
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -22,9 +22,11 @@ import com.google.common.util.concurrent.AbstractIdleService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
@@ -45,6 +47,7 @@ public class PruneUpperBoundWriter extends AbstractIdleService {
private final ConcurrentSkipListMap<byte[], Long> emptyRegions;
private volatile Thread flushThread;
+ private volatile boolean stopped;
private long lastChecked;
@@ -87,6 +90,7 @@ public class PruneUpperBoundWriter extends AbstractIdleService {
@Override
protected void shutDown() throws Exception {
LOG.info("Stopping PruneUpperBoundWriter Thread.");
+ stopped = true;
if (flushThread != null) {
flushThread.interrupt();
flushThread.join(TimeUnit.SECONDS.toMillis(1));
@@ -97,27 +101,33 @@ public class PruneUpperBoundWriter extends AbstractIdleService {
flushThread = new Thread("tephra-prune-upper-bound-writer") {
@Override
public void run() {
- while ((!isInterrupted()) && isRunning()) {
+ while ((!isInterrupted()) && (!stopped)) {
long now = System.currentTimeMillis();
if (now > (lastChecked + pruneFlushInterval)) {
// should flush data
try {
- // Record prune upper bound
- while (!pruneEntries.isEmpty()) {
- Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
- dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
- // We can now remove the entry only if the key and value match with what we wrote since it is
- // possible that a new pruneUpperBound for the same key has been added
- pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
- }
- // Record empty regions
- while (!emptyRegions.isEmpty()) {
- Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry();
- dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey());
- // We can now remove the entry only if the key and value match with what we wrote since it is
- // possible that a new value for the same key has been added
- emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue());
- }
+ User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ // Record prune upper bound
+ while (!pruneEntries.isEmpty()) {
+ Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
+ dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
+ // We can now remove the entry only if the key and value match with what we wrote since it is
+ // possible that a new pruneUpperBound for the same key has been added
+ pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
+ }
+ // Record empty regions
+ while (!emptyRegions.isEmpty()) {
+ Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry();
+ dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey());
+ // We can now remove the entry only if the key and value match with what we wrote since it is
+ // possible that a new value for the same key has been added
+ emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue());
+ }
+ return null;
+ }
+ });
} catch (IOException ex) {
LOG.warn("Cannot record prune upper bound for a region to table " +
tableName.getNamespaceAsString() + ":" + tableName.getNameAsString(), ex);
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/203825e2/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 7485b91..b73bdc1 100644
--- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -107,12 +107,13 @@ public class TransactionProcessor extends BaseRegionObserver {
private final TransactionCodec txCodec;
private TransactionStateCache cache;
- private CompactionState compactionState;
+ private volatile CompactionState compactionState;
+
+ protected volatile Boolean pruneEnable;
+ protected volatile Long txMaxLifetimeMillis;
protected Map<byte[], Long> ttlByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
protected boolean allowEmptyValues = TxConstants.ALLOW_EMPTY_VALUES_DEFAULT;
protected boolean readNonTxnData = TxConstants.DEFAULT_READ_NON_TX_DATA;
- protected Long txMaxLifetimeMillis;
- private Boolean pruneEnable;
public TransactionProcessor() {
this.txCodec = new TransactionCodec();
@@ -144,10 +145,12 @@ public class TransactionProcessor extends BaseRegionObserver {
}
this.allowEmptyValues = getAllowEmptyValues(env, tableDesc);
+ this.txMaxLifetimeMillis = getTxMaxLifetimeMillis(env);
this.readNonTxnData = Boolean.valueOf(tableDesc.getValue(TxConstants.READ_NON_TX_DATA));
if (readNonTxnData) {
LOG.info("Reading pre-existing data enabled for table " + tableDesc.getNameAsString());
}
+ initializePruneState(env);
}
}
@@ -171,9 +174,7 @@ public class TransactionProcessor extends BaseRegionObserver {
@Override
public void stop(CoprocessorEnvironment e) throws IOException {
- if (compactionState != null) {
- compactionState.stop();
- }
+ resetPruneState();
}
@Override
@@ -248,6 +249,15 @@ public class TransactionProcessor extends BaseRegionObserver {
Boolean.valueOf(allowEmptyValuesFromTableDesc) : allowEmptyValuesFromConfig;
}
+ private long getTxMaxLifetimeMillis(RegionCoprocessorEnvironment env) {
+ Configuration conf = getConfiguration(env);
+ if (conf != null) {
+ return TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME,
+ TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME));
+ }
+ return TimeUnit.SECONDS.toMillis(TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME);
+ }
+
private boolean isFamilyDelete(List<Cell> familyCells) {
return familyCells.size() == 1 && CellUtil.isDeleteFamily(familyCells.get(0));
}
@@ -321,15 +331,10 @@ public class TransactionProcessor extends BaseRegionObserver {
LOG.debug(String.format("Region %s: memstore size = %s, num store files = %s",
region.getRegionInfo().getRegionNameAsString(), memstoreSize, numStoreFiles));
if (memstoreSize == 0 && numStoreFiles == 0) {
- if (pruneEnable == null) {
- initPruneState(e);
- }
-
- if (Boolean.TRUE.equals(pruneEnable)) {
+ if (compactionState != null) {
compactionState.persistRegionEmpty(System.currentTimeMillis());
}
}
-
}
@Override
@@ -340,12 +345,8 @@ public class TransactionProcessor extends BaseRegionObserver {
// Get the latest tx snapshot state for the compaction
TransactionVisibilityState snapshot = cache.getLatestState();
- if (pruneEnable == null) {
- initPruneState(c);
- }
-
- if (Boolean.TRUE.equals(pruneEnable)) {
- // Record tx state before the compaction
+ // Record tx state before the compaction
+ if (compactionState != null) {
compactionState.record(request, snapshot);
}
@@ -356,7 +357,7 @@ public class TransactionProcessor extends BaseRegionObserver {
@Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, StoreFile resultFile,
CompactionRequest request) throws IOException {
- // Persist the compaction state after a succesful compaction
+ // Persist the compaction state after a successful compaction
if (compactionState != null) {
compactionState.persist();
}
@@ -416,21 +417,8 @@ public class TransactionProcessor extends BaseRegionObserver {
return;
}
- if (txMaxLifetimeMillis == null) {
- Configuration conf = getConfiguration(env);
- // Configuration won't be null in TransactionProcessor but the derived classes might return
- // null if it is not available temporarily
- if (conf != null) {
- this.txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME,
- TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME));
- } else {
- throw new IOException(String.format("Could not validate Transaction since the value of max lifetime is " +
- "unavailable. Please retry the operation."));
- }
- }
-
boolean validLifetime =
- TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis > System.currentTimeMillis();
+ (TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis) > System.currentTimeMillis();
if (!validLifetime) {
throw new DoNotRetryIOException(String.format("Transaction %s has exceeded max lifetime %s ms",
tx.getTransactionId(), txMaxLifetimeMillis));
@@ -454,28 +442,47 @@ public class TransactionProcessor extends BaseRegionObserver {
return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter);
}
- private void initPruneState(ObserverContext<RegionCoprocessorEnvironment> c) {
- Configuration conf = getConfiguration(c.getEnvironment());
- // Configuration won't be null in TransactionProcessor but the derived classes might return
- // null if it is not available temporarily
+ /**
+ * Refresh the properties related to transaction pruning. This method needs to be invoked if there is change in the
+ * prune related properties after clearing the state by calling {@link #resetPruneState}.
+ *
+ * @param env {@link RegionCoprocessorEnvironment} of this region
+ */
+ protected void initializePruneState(RegionCoprocessorEnvironment env) {
+ Configuration conf = getConfiguration(env);
if (conf != null) {
pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
+
if (Boolean.TRUE.equals(pruneEnable)) {
- String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
- TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
- long pruneFlushInterval = TimeUnit.SECONDS.toMillis(
- conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
- TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
- compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
+ TableName pruneTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
+ long pruneFlushInterval = TimeUnit.SECONDS.toMillis(conf.getLong(
+ TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
+
+ compactionState = new CompactionState(env, pruneTable, pruneFlushInterval);
if (LOG.isDebugEnabled()) {
- LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
- + pruneTable);
+ LOG.debug(String.format("Automatic invalid list pruning is enabled for table %s. Compaction state " +
+ "will be recorded in table %s",
+ env.getRegionInfo().getTable().getNameWithNamespaceInclAsString(),
+ pruneTable.getNameWithNamespaceInclAsString()));
}
}
}
}
+ /**
+ * Stop and clear state related to pruning.
+ */
+ protected void resetPruneState() {
+ pruneEnable = false;
+ if (compactionState != null) {
+ compactionState.stop();
+ compactionState = null;
+ }
+ }
+
private long numStoreFilesForRegion(ObserverContext<RegionCoprocessorEnvironment> c) {
long numStoreFiles = 0;
for (Store store : c.getEnvironment().getRegion().getStores().values()) {
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/203825e2/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
index 9773a15..6bd8bab 100644
--- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -22,9 +22,11 @@ import com.google.common.util.concurrent.AbstractIdleService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
@@ -45,6 +47,7 @@ public class PruneUpperBoundWriter extends AbstractIdleService {
private final ConcurrentSkipListMap<byte[], Long> emptyRegions;
private volatile Thread flushThread;
+ private volatile boolean stopped;
private long lastChecked;
@@ -87,6 +90,7 @@ public class PruneUpperBoundWriter extends AbstractIdleService {
@Override
protected void shutDown() throws Exception {
LOG.info("Stopping PruneUpperBoundWriter Thread.");
+ stopped = true;
if (flushThread != null) {
flushThread.interrupt();
flushThread.join(TimeUnit.SECONDS.toMillis(1));
@@ -97,27 +101,33 @@ public class PruneUpperBoundWriter extends AbstractIdleService {
flushThread = new Thread("tephra-prune-upper-bound-writer") {
@Override
public void run() {
- while ((!isInterrupted()) && isRunning()) {
+ while ((!isInterrupted()) && (!stopped)) {
long now = System.currentTimeMillis();
if (now > (lastChecked + pruneFlushInterval)) {
// should flush data
try {
- // Record prune upper bound
- while (!pruneEntries.isEmpty()) {
- Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
- dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
- // We can now remove the entry only if the key and value match with what we wrote since it is
- // possible that a new pruneUpperBound for the same key has been added
- pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
- }
- // Record empty regions
- while (!emptyRegions.isEmpty()) {
- Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry();
- dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey());
- // We can now remove the entry only if the key and value match with what we wrote since it is
- // possible that a new value for the same key has been added
- emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue());
- }
+ User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ // Record prune upper bound
+ while (!pruneEntries.isEmpty()) {
+ Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
+ dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
+ // We can now remove the entry only if the key and value match with what we wrote since it is
+ // possible that a new pruneUpperBound for the same key has been added
+ pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
+ }
+ // Record empty regions
+ while (!emptyRegions.isEmpty()) {
+ Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry();
+ dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey());
+ // We can now remove the entry only if the key and value match with what we wrote since it is
+ // possible that a new value for the same key has been added
+ emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue());
+ }
+ return null;
+ }
+ });
} catch (IOException ex) {
LOG.warn("Cannot record prune upper bound for a region to table " +
tableName.getNameWithNamespaceInclAsString(), ex);
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/203825e2/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 7485b91..f9bb35e 100644
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -107,12 +107,13 @@ public class TransactionProcessor extends BaseRegionObserver {
private final TransactionCodec txCodec;
private TransactionStateCache cache;
- private CompactionState compactionState;
+ private volatile CompactionState compactionState;
+
+ protected volatile Boolean pruneEnable;
+ protected volatile Long txMaxLifetimeMillis;
protected Map<byte[], Long> ttlByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
protected boolean allowEmptyValues = TxConstants.ALLOW_EMPTY_VALUES_DEFAULT;
protected boolean readNonTxnData = TxConstants.DEFAULT_READ_NON_TX_DATA;
- protected Long txMaxLifetimeMillis;
- private Boolean pruneEnable;
public TransactionProcessor() {
this.txCodec = new TransactionCodec();
@@ -144,10 +145,12 @@ public class TransactionProcessor extends BaseRegionObserver {
}
this.allowEmptyValues = getAllowEmptyValues(env, tableDesc);
+ this.txMaxLifetimeMillis = getTxMaxLifetimeMillis(env);
this.readNonTxnData = Boolean.valueOf(tableDesc.getValue(TxConstants.READ_NON_TX_DATA));
if (readNonTxnData) {
LOG.info("Reading pre-existing data enabled for table " + tableDesc.getNameAsString());
}
+ initializePruneState(env);
}
}
@@ -171,9 +174,7 @@ public class TransactionProcessor extends BaseRegionObserver {
@Override
public void stop(CoprocessorEnvironment e) throws IOException {
- if (compactionState != null) {
- compactionState.stop();
- }
+ resetPruneState();
}
@Override
@@ -248,6 +249,15 @@ public class TransactionProcessor extends BaseRegionObserver {
Boolean.valueOf(allowEmptyValuesFromTableDesc) : allowEmptyValuesFromConfig;
}
+ private long getTxMaxLifetimeMillis(RegionCoprocessorEnvironment env) {
+ Configuration conf = getConfiguration(env);
+ if (conf != null) {
+ return TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME,
+ TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME));
+ }
+ return TimeUnit.SECONDS.toMillis(TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME);
+ }
+
private boolean isFamilyDelete(List<Cell> familyCells) {
return familyCells.size() == 1 && CellUtil.isDeleteFamily(familyCells.get(0));
}
@@ -321,15 +331,10 @@ public class TransactionProcessor extends BaseRegionObserver {
LOG.debug(String.format("Region %s: memstore size = %s, num store files = %s",
region.getRegionInfo().getRegionNameAsString(), memstoreSize, numStoreFiles));
if (memstoreSize == 0 && numStoreFiles == 0) {
- if (pruneEnable == null) {
- initPruneState(e);
- }
-
- if (Boolean.TRUE.equals(pruneEnable)) {
+ if (compactionState != null) {
compactionState.persistRegionEmpty(System.currentTimeMillis());
}
}
-
}
@Override
@@ -340,12 +345,8 @@ public class TransactionProcessor extends BaseRegionObserver {
// Get the latest tx snapshot state for the compaction
TransactionVisibilityState snapshot = cache.getLatestState();
- if (pruneEnable == null) {
- initPruneState(c);
- }
-
- if (Boolean.TRUE.equals(pruneEnable)) {
- // Record tx state before the compaction
+ // Record tx state before the compaction
+ if (compactionState != null) {
compactionState.record(request, snapshot);
}
@@ -356,7 +357,7 @@ public class TransactionProcessor extends BaseRegionObserver {
@Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, StoreFile resultFile,
CompactionRequest request) throws IOException {
- // Persist the compaction state after a succesful compaction
+ // Persist the compaction state after a successful compaction
if (compactionState != null) {
compactionState.persist();
}
@@ -416,21 +417,8 @@ public class TransactionProcessor extends BaseRegionObserver {
return;
}
- if (txMaxLifetimeMillis == null) {
- Configuration conf = getConfiguration(env);
- // Configuration won't be null in TransactionProcessor but the derived classes might return
- // null if it is not available temporarily
- if (conf != null) {
- this.txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME,
- TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME));
- } else {
- throw new IOException(String.format("Could not validate Transaction since the value of max lifetime is " +
- "unavailable. Please retry the operation."));
- }
- }
-
boolean validLifetime =
- TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis > System.currentTimeMillis();
+ (TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis) > System.currentTimeMillis();
if (!validLifetime) {
throw new DoNotRetryIOException(String.format("Transaction %s has exceeded max lifetime %s ms",
tx.getTransactionId(), txMaxLifetimeMillis));
@@ -454,28 +442,47 @@ public class TransactionProcessor extends BaseRegionObserver {
return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter);
}
- private void initPruneState(ObserverContext<RegionCoprocessorEnvironment> c) {
- Configuration conf = getConfiguration(c.getEnvironment());
- // Configuration won't be null in TransactionProcessor but the derived classes might return
- // null if it is not available temporarily
+ /**
+ * Refresh the properties related to transaction pruning. This method needs to be invoked if there is change in the
+ * prune related properties after clearing the state by calling {@link #resetPruneState}.
+ *
+ * @param env {@link RegionCoprocessorEnvironment} of this region
+ */
+ protected void initializePruneState(RegionCoprocessorEnvironment env) {
+ Configuration conf = getConfiguration(env);
if (conf != null) {
pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
+
if (Boolean.TRUE.equals(pruneEnable)) {
- String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
- TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
- long pruneFlushInterval = TimeUnit.SECONDS.toMillis(
- conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
- TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
- compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
+ TableName pruneTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
+ long pruneFlushInterval = TimeUnit.SECONDS.toMillis(conf.getLong(
+ TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
+
+ compactionState = new CompactionState(env, pruneTable, pruneFlushInterval);
if (LOG.isDebugEnabled()) {
- LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
- + pruneTable);
+ LOG.debug(String.format("Automatic invalid list pruning is enabled for table %s:%s. Compaction state will " +
+ "be recorded in table %s:%s", env.getRegionInfo().getTable().getNamespaceAsString(),
+ env.getRegionInfo().getTable().getNameAsString(), pruneTable.getNamespaceAsString(),
+ pruneTable.getNameAsString()));
}
}
}
}
+ /**
+ * Stop and clear state related to pruning.
+ */
+ protected void resetPruneState() {
+ pruneEnable = false;
+ if (compactionState != null) {
+ compactionState.stop();
+ compactionState = null;
+ }
+ }
+
private long numStoreFilesForRegion(ObserverContext<RegionCoprocessorEnvironment> c) {
long numStoreFiles = 0;
for (Store store : c.getEnvironment().getRegion().getStores().values()) {
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/203825e2/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
index beed1ad..38c1a6f 100644
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -22,9 +22,11 @@ import com.google.common.util.concurrent.AbstractIdleService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
@@ -45,6 +47,7 @@ public class PruneUpperBoundWriter extends AbstractIdleService {
private final ConcurrentSkipListMap<byte[], Long> emptyRegions;
private volatile Thread flushThread;
+ private volatile boolean stopped;
private long lastChecked;
@@ -87,6 +90,7 @@ public class PruneUpperBoundWriter extends AbstractIdleService {
@Override
protected void shutDown() throws Exception {
LOG.info("Stopping PruneUpperBoundWriter Thread.");
+ stopped = true;
if (flushThread != null) {
flushThread.interrupt();
flushThread.join(TimeUnit.SECONDS.toMillis(1));
@@ -97,27 +101,33 @@ public class PruneUpperBoundWriter extends AbstractIdleService {
flushThread = new Thread("tephra-prune-upper-bound-writer") {
@Override
public void run() {
- while ((!isInterrupted()) && isRunning()) {
+ while ((!isInterrupted()) && (!stopped)) {
long now = System.currentTimeMillis();
if (now > (lastChecked + pruneFlushInterval)) {
// should flush data
try {
- // Record prune upper bound
- while (!pruneEntries.isEmpty()) {
- Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
- dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
- // We can now remove the entry only if the key and value match with what we wrote since it is
- // possible that a new pruneUpperBound for the same key has been added
- pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
- }
- // Record empty regions
- while (!emptyRegions.isEmpty()) {
- Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry();
- dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey());
- // We can now remove the entry only if the key and value match with what we wrote since it is
- // possible that a new value for the same key has been added
- emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue());
- }
+ User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ // Record prune upper bound
+ while (!pruneEntries.isEmpty()) {
+ Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
+ dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
+ // We can now remove the entry only if the key and value match with what we wrote since it is
+ // possible that a new pruneUpperBound for the same key has been added
+ pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
+ }
+ // Record empty regions
+ while (!emptyRegions.isEmpty()) {
+ Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry();
+ dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey());
+ // We can now remove the entry only if the key and value match with what we wrote since it is
+ // possible that a new value for the same key has been added
+ emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue());
+ }
+ return null;
+ }
+ });
} catch (IOException ex) {
LOG.warn("Cannot record prune upper bound for a region to table " +
tableName.getNamespaceAsString() + ":" + tableName.getNameAsString(), ex);
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/203825e2/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 5e1b4c5..02e2dac 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -107,12 +107,13 @@ public class TransactionProcessor extends BaseRegionObserver {
private final TransactionCodec txCodec;
private TransactionStateCache cache;
- private CompactionState compactionState;
+ private volatile CompactionState compactionState;
+
+ protected volatile Boolean pruneEnable;
+ protected volatile Long txMaxLifetimeMillis;
protected Map<byte[], Long> ttlByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
protected boolean allowEmptyValues = TxConstants.ALLOW_EMPTY_VALUES_DEFAULT;
protected boolean readNonTxnData = TxConstants.DEFAULT_READ_NON_TX_DATA;
- protected Long txMaxLifetimeMillis;
- private Boolean pruneEnable;
public TransactionProcessor() {
this.txCodec = new TransactionCodec();
@@ -144,10 +145,12 @@ public class TransactionProcessor extends BaseRegionObserver {
}
this.allowEmptyValues = getAllowEmptyValues(env, tableDesc);
+ this.txMaxLifetimeMillis = getTxMaxLifetimeMillis(env);
this.readNonTxnData = Boolean.valueOf(tableDesc.getValue(TxConstants.READ_NON_TX_DATA));
if (readNonTxnData) {
LOG.info("Reading pre-existing data enabled for table " + tableDesc.getNameAsString());
}
+ initializePruneState(env);
}
}
@@ -171,9 +174,7 @@ public class TransactionProcessor extends BaseRegionObserver {
@Override
public void stop(CoprocessorEnvironment e) throws IOException {
- if (compactionState != null) {
- compactionState.stop();
- }
+ resetPruneState();
}
@Override
@@ -248,6 +249,15 @@ public class TransactionProcessor extends BaseRegionObserver {
Boolean.valueOf(allowEmptyValuesFromTableDesc) : allowEmptyValuesFromConfig;
}
+ private long getTxMaxLifetimeMillis(RegionCoprocessorEnvironment env) {
+ Configuration conf = getConfiguration(env);
+ if (conf != null) {
+ return TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME,
+ TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME));
+ }
+ return TimeUnit.SECONDS.toMillis(TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME);
+ }
+
private boolean isFamilyDelete(List<Cell> familyCells) {
return familyCells.size() == 1 && CellUtil.isDeleteFamily(familyCells.get(0));
}
@@ -321,15 +331,10 @@ public class TransactionProcessor extends BaseRegionObserver {
LOG.debug(String.format("Region %s: memstore size = %s, num store files = %s",
region.getRegionInfo().getRegionNameAsString(), memstoreSize, numStoreFiles));
if (memstoreSize == 0 && numStoreFiles == 0) {
- if (pruneEnable == null) {
- initPruneState(e);
- }
-
- if (Boolean.TRUE.equals(pruneEnable)) {
+ if (compactionState != null) {
compactionState.persistRegionEmpty(System.currentTimeMillis());
}
}
-
}
@Override
@@ -340,12 +345,8 @@ public class TransactionProcessor extends BaseRegionObserver {
// Get the latest tx snapshot state for the compaction
TransactionVisibilityState snapshot = cache.getLatestState();
- if (pruneEnable == null) {
- initPruneState(c);
- }
-
- if (Boolean.TRUE.equals(pruneEnable)) {
- // Record tx state before the compaction
+ // Record tx state before the compaction
+ if (compactionState != null) {
compactionState.record(request, snapshot);
}
@@ -356,7 +357,7 @@ public class TransactionProcessor extends BaseRegionObserver {
@Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, StoreFile resultFile,
CompactionRequest request) throws IOException {
- // Persist the compaction state after a succesful compaction
+ // Persist the compaction state after a successful compaction
if (compactionState != null) {
compactionState.persist();
}
@@ -416,21 +417,8 @@ public class TransactionProcessor extends BaseRegionObserver {
return;
}
- if (txMaxLifetimeMillis == null) {
- Configuration conf = getConfiguration(env);
- // Configuration won't be null in TransactionProcessor but the derived classes might return
- // null if it is not available temporarily
- if (conf != null) {
- this.txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME,
- TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME));
- } else {
- throw new IOException(String.format("Could not validate Transaction since the value of max lifetime is " +
- "unavailable. Please retry the operation."));
- }
- }
-
boolean validLifetime =
- TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis > System.currentTimeMillis();
+ (TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis) > System.currentTimeMillis();
if (!validLifetime) {
throw new DoNotRetryIOException(String.format("Transaction %s has exceeded max lifetime %s ms",
tx.getTransactionId(), txMaxLifetimeMillis));
@@ -454,28 +442,47 @@ public class TransactionProcessor extends BaseRegionObserver {
return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter);
}
- private void initPruneState(ObserverContext<RegionCoprocessorEnvironment> c) {
- Configuration conf = getConfiguration(c.getEnvironment());
- // Configuration won't be null in TransactionProcessor but the derived classes might return
- // null if it is not available temporarily
+ /**
+ * Refresh the properties related to transaction pruning. This method needs to be invoked if there is change in the
+ * prune related properties after clearing the state by calling {@link #resetPruneState}.
+ *
+ * @param env {@link RegionCoprocessorEnvironment} of this region
+ */
+ protected void initializePruneState(RegionCoprocessorEnvironment env) {
+ Configuration conf = getConfiguration(env);
if (conf != null) {
pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
+
if (Boolean.TRUE.equals(pruneEnable)) {
- String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
- TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
- long pruneFlushInterval = TimeUnit.SECONDS.toMillis(
- conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
- TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
- compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval);
+ TableName pruneTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
+ long pruneFlushInterval = TimeUnit.SECONDS.toMillis(conf.getLong(
+ TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
+
+ compactionState = new CompactionState(env, pruneTable, pruneFlushInterval);
if (LOG.isDebugEnabled()) {
- LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table "
- + pruneTable);
+ LOG.debug(String.format("Automatic invalid list pruning is enabled for table %s. Compaction state " +
+ "will be recorded in table %s",
+ env.getRegionInfo().getTable().getNameWithNamespaceInclAsString(),
+ pruneTable.getNameWithNamespaceInclAsString()));
}
}
}
}
+ /**
+ * Stop and clear state related to pruning.
+ */
+ protected void resetPruneState() {
+ pruneEnable = false;
+ if (compactionState != null) {
+ compactionState.stop();
+ compactionState = null;
+ }
+ }
+
private long numStoreFilesForRegion(ObserverContext<RegionCoprocessorEnvironment> c) {
long numStoreFiles = 0;
for (Store store : c.getEnvironment().getRegion().getStores()) {
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/203825e2/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
index 9773a15..6bd8bab 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -22,9 +22,11 @@ import com.google.common.util.concurrent.AbstractIdleService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
@@ -45,6 +47,7 @@ public class PruneUpperBoundWriter extends AbstractIdleService {
private final ConcurrentSkipListMap<byte[], Long> emptyRegions;
private volatile Thread flushThread;
+ private volatile boolean stopped;
private long lastChecked;
@@ -87,6 +90,7 @@ public class PruneUpperBoundWriter extends AbstractIdleService {
@Override
protected void shutDown() throws Exception {
LOG.info("Stopping PruneUpperBoundWriter Thread.");
+ stopped = true;
if (flushThread != null) {
flushThread.interrupt();
flushThread.join(TimeUnit.SECONDS.toMillis(1));
@@ -97,27 +101,33 @@ public class PruneUpperBoundWriter extends AbstractIdleService {
flushThread = new Thread("tephra-prune-upper-bound-writer") {
@Override
public void run() {
- while ((!isInterrupted()) && isRunning()) {
+ while ((!isInterrupted()) && (!stopped)) {
long now = System.currentTimeMillis();
if (now > (lastChecked + pruneFlushInterval)) {
// should flush data
try {
- // Record prune upper bound
- while (!pruneEntries.isEmpty()) {
- Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
- dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
- // We can now remove the entry only if the key and value match with what we wrote since it is
- // possible that a new pruneUpperBound for the same key has been added
- pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
- }
- // Record empty regions
- while (!emptyRegions.isEmpty()) {
- Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry();
- dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey());
- // We can now remove the entry only if the key and value match with what we wrote since it is
- // possible that a new value for the same key has been added
- emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue());
- }
+ User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ // Record prune upper bound
+ while (!pruneEntries.isEmpty()) {
+ Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
+ dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
+ // We can now remove the entry only if the key and value match with what we wrote since it is
+ // possible that a new pruneUpperBound for the same key has been added
+ pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
+ }
+ // Record empty regions
+ while (!emptyRegions.isEmpty()) {
+ Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry();
+ dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey());
+ // We can now remove the entry only if the key and value match with what we wrote since it is
+ // possible that a new value for the same key has been added
+ emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue());
+ }
+ return null;
+ }
+ });
} catch (IOException ex) {
LOG.warn("Cannot record prune upper bound for a region to table " +
tableName.getNameWithNamespaceInclAsString(), ex);