You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2017/02/22 22:49:42 UTC
incubator-tephra git commit: TEPHRA-224 Handle delay between
transaction max lifetime check and data writes while pruning
Repository: incubator-tephra
Updated Branches:
refs/heads/release/0.11.0-incubating 872fb1090 -> 808ed2e3f
TEPHRA-224 Handle delay between transaction max lifetime check and data writes while pruning
This closes #38
Signed-off-by: poorna <po...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/808ed2e3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/808ed2e3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/808ed2e3
Branch: refs/heads/release/0.11.0-incubating
Commit: 808ed2e3fe40c86cf05442e0d842bbc844ddd857
Parents: 872fb10
Author: poorna <po...@cask.co>
Authored: Tue Feb 21 17:15:20 2017 -0800
Committer: poorna <po...@apache.org>
Committed: Wed Feb 22 14:49:16 2017 -0800
----------------------------------------------------------------------
.../src/main/java/org/apache/tephra/TxConstants.java | 6 ++++++
.../tephra/txprune/TransactionPruningRunnable.java | 12 ++++++++++--
.../tephra/txprune/TransactionPruningService.java | 9 ++++++---
.../tephra/txprune/TransactionPruningServiceTest.java | 10 ++++++----
4 files changed, 28 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/808ed2e3/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
index ebf91e3..26a48fb 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
@@ -376,6 +376,11 @@ public class TxConstants {
public static final String PRUNE_FLUSH_INTERVAL = "data.tx.prune.flush.interval";
/**
+ * The time in seconds used to pad transaction max lifetime while pruning.
+ */
+ public static final String PRUNE_GRACE_PERIOD = "data.tx.grace.period";
+
+ /**
* Comma separated list of invalid transaction pruning plugins to load
*/
public static final String PLUGINS = "data.tx.prune.plugins";
@@ -388,6 +393,7 @@ public class TxConstants {
public static final String DEFAULT_PRUNE_STATE_TABLE = "tephra.state";
public static final long DEFAULT_PRUNE_INTERVAL = TimeUnit.HOURS.toSeconds(6);
public static final long DEFAULT_PRUNE_FLUSH_INTERVAL = TimeUnit.MINUTES.toSeconds(1);
+ public static final long DEFAULT_PRUNE_GRACE_PERIOD = TimeUnit.HOURS.toSeconds(24);
public static final String DEFAULT_PLUGIN = "data.tx.prune.plugin.default";
public static final String DEFAULT_PLUGIN_CLASS =
"org.apache.tephra.hbase.txprune.HBaseTransactionPruningPlugin";
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/808ed2e3/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningRunnable.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningRunnable.java b/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningRunnable.java
index d73c50a..89ed25e 100644
--- a/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningRunnable.java
+++ b/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningRunnable.java
@@ -36,18 +36,21 @@ import java.util.TreeSet;
* This class executes one run of transaction pruning every time it is invoked.
* Typically, this class will be scheduled to run periodically.
*/
+@SuppressWarnings("WeakerAccess")
public class TransactionPruningRunnable implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(TransactionPruningRunnable.class);
private final TransactionManager txManager;
private final Map<String, TransactionPruningPlugin> plugins;
private final long txMaxLifetimeMillis;
+ private final long txPruneBufferMillis;
public TransactionPruningRunnable(TransactionManager txManager, Map<String, TransactionPruningPlugin> plugins,
- long txMaxLifetimeMillis) {
+ long txMaxLifetimeMillis, long txPruneBufferMillis) {
this.txManager = txManager;
this.plugins = plugins;
this.txMaxLifetimeMillis = txMaxLifetimeMillis;
+ this.txPruneBufferMillis = txPruneBufferMillis;
}
@Override
@@ -57,8 +60,13 @@ public class TransactionPruningRunnable implements Runnable {
Transaction tx = txManager.startShort();
txManager.abort(tx);
+ if (tx.getInvalids().length == 0) {
+ LOG.info("Invalid list is empty, not running transaction pruning");
+ return;
+ }
+
long now = getTime();
- long inactiveTransactionBound = TxUtils.getInactiveTxBound(now, txMaxLifetimeMillis);
+ long inactiveTransactionBound = TxUtils.getInactiveTxBound(now, txMaxLifetimeMillis + txPruneBufferMillis);
LOG.info("Starting invalid prune run for time {} and inactive transaction bound {}",
now, inactiveTransactionBound);
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/808ed2e3/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java b/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java
index d80bbd4..8d7fe2f 100644
--- a/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java
+++ b/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java
@@ -78,8 +78,11 @@ public class TransactionPruningService extends AbstractIdleService {
Map<String, TransactionPruningPlugin> plugins = initializePlugins();
long txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME,
TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME));
+ long txPruneBufferMillis =
+ TimeUnit.SECONDS.toMillis(conf.getLong(TxConstants.TransactionPruning.PRUNE_GRACE_PERIOD,
+ TxConstants.TransactionPruning.DEFAULT_PRUNE_GRACE_PERIOD));
scheduledExecutorService.scheduleAtFixedRate(
- getTxPruneRunnable(txManager, plugins, txMaxLifetimeMillis),
+ getTxPruneRunnable(txManager, plugins, txMaxLifetimeMillis, txPruneBufferMillis),
scheduleInterval, scheduleInterval, TimeUnit.SECONDS);
LOG.info("Scheduled {} plugins with interval {} seconds", plugins.size(), scheduleInterval);
}
@@ -104,8 +107,8 @@ public class TransactionPruningService extends AbstractIdleService {
@VisibleForTesting
TransactionPruningRunnable getTxPruneRunnable(TransactionManager txManager,
Map<String, TransactionPruningPlugin> plugins,
- long txMaxLifetimeMillis) {
- return new TransactionPruningRunnable(txManager, plugins, txMaxLifetimeMillis);
+ long txMaxLifetimeMillis, long txPruneBufferMillis) {
+ return new TransactionPruningRunnable(txManager, plugins, txMaxLifetimeMillis, txPruneBufferMillis);
}
private Map<String, TransactionPruningPlugin> initializePlugins()
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/808ed2e3/tephra-core/src/test/java/org/apache/tephra/txprune/TransactionPruningServiceTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/txprune/TransactionPruningServiceTest.java b/tephra-core/src/test/java/org/apache/tephra/txprune/TransactionPruningServiceTest.java
index 9c23ab7..2a0a17e 100644
--- a/tephra-core/src/test/java/org/apache/tephra/txprune/TransactionPruningServiceTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/txprune/TransactionPruningServiceTest.java
@@ -69,6 +69,7 @@ public class TransactionPruningServiceTest {
conf.setBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, true);
conf.setInt(TxConstants.TransactionPruning.PRUNE_INTERVAL, 1);
conf.setInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, 10);
+ conf.setLong(TxConstants.TransactionPruning.PRUNE_GRACE_PERIOD, 0);
// Setup mock data
long m = 1000;
@@ -132,6 +133,7 @@ public class TransactionPruningServiceTest {
conf.setBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, true);
conf.setInt(TxConstants.TransactionPruning.PRUNE_INTERVAL, 1);
conf.setInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, 10);
+ conf.setLong(TxConstants.TransactionPruning.PRUNE_GRACE_PERIOD, 0);
// Setup mock data
long m = 1000;
@@ -222,8 +224,8 @@ public class TransactionPruningServiceTest {
@Override
TransactionPruningRunnable getTxPruneRunnable(TransactionManager txManager,
Map<String, TransactionPruningPlugin> plugins,
- long txMaxLifetimeMillis) {
- return new TestTransactionPruningRunnable(txManager, plugins, txMaxLifetimeMillis);
+ long txMaxLifetimeMillis, long txPruneBufferMillis) {
+ return new TestTransactionPruningRunnable(txManager, plugins, txMaxLifetimeMillis, txPruneBufferMillis);
}
}
@@ -233,8 +235,8 @@ public class TransactionPruningServiceTest {
private static class TestTransactionPruningRunnable extends TransactionPruningRunnable {
private static Iterator<Long> currentTime;
TestTransactionPruningRunnable(TransactionManager txManager, Map<String, TransactionPruningPlugin> plugins,
- long txMaxLifetimeMillis) {
- super(txManager, plugins, txMaxLifetimeMillis);
+ long txMaxLifetimeMillis, long txPruneBufferMillis) {
+ super(txManager, plugins, txMaxLifetimeMillis, txPruneBufferMillis);
}
@Override