You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by go...@apache.org on 2017/03/07 00:34:17 UTC

[4/9] incubator-tephra git commit: TEPHRA-224 Handle delay between transaction max lifetime check and data writes while pruning

TEPHRA-224 Handle delay between transaction max lifetime check and data writes while pruning

This closes #38

Signed-off-by: poorna <po...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/808ed2e3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/808ed2e3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/808ed2e3

Branch: refs/heads/master
Commit: 808ed2e3fe40c86cf05442e0d842bbc844ddd857
Parents: 872fb10
Author: poorna <po...@cask.co>
Authored: Tue Feb 21 17:15:20 2017 -0800
Committer: poorna <po...@apache.org>
Committed: Wed Feb 22 14:49:16 2017 -0800

----------------------------------------------------------------------
 .../src/main/java/org/apache/tephra/TxConstants.java    |  6 ++++++
 .../tephra/txprune/TransactionPruningRunnable.java      | 12 ++++++++++--
 .../tephra/txprune/TransactionPruningService.java       |  9 ++++++---
 .../tephra/txprune/TransactionPruningServiceTest.java   | 10 ++++++----
 4 files changed, 28 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/808ed2e3/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
index ebf91e3..26a48fb 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
@@ -376,6 +376,11 @@ public class TxConstants {
     public static final String PRUNE_FLUSH_INTERVAL = "data.tx.prune.flush.interval";
 
     /**
+     * The time in seconds used to pad transaction max lifetime while pruning.
+     */
+    public static final String PRUNE_GRACE_PERIOD = "data.tx.grace.period";
+
+    /**
      * Comma separated list of invalid transaction pruning plugins to load
      */
     public static final String PLUGINS = "data.tx.prune.plugins";
@@ -388,6 +393,7 @@ public class TxConstants {
     public static final String DEFAULT_PRUNE_STATE_TABLE = "tephra.state";
     public static final long DEFAULT_PRUNE_INTERVAL = TimeUnit.HOURS.toSeconds(6);
     public static final long DEFAULT_PRUNE_FLUSH_INTERVAL = TimeUnit.MINUTES.toSeconds(1);
+    public static final long DEFAULT_PRUNE_GRACE_PERIOD = TimeUnit.HOURS.toSeconds(24);
     public static final String DEFAULT_PLUGIN = "data.tx.prune.plugin.default";
     public static final String DEFAULT_PLUGIN_CLASS =
       "org.apache.tephra.hbase.txprune.HBaseTransactionPruningPlugin";

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/808ed2e3/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningRunnable.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningRunnable.java b/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningRunnable.java
index d73c50a..89ed25e 100644
--- a/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningRunnable.java
+++ b/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningRunnable.java
@@ -36,18 +36,21 @@ import java.util.TreeSet;
  * This class executes one run of transaction pruning every time it is invoked.
  * Typically, this class will be scheduled to run periodically.
  */
+@SuppressWarnings("WeakerAccess")
 public class TransactionPruningRunnable implements Runnable {
   private static final Logger LOG = LoggerFactory.getLogger(TransactionPruningRunnable.class);
 
   private final TransactionManager txManager;
   private final Map<String, TransactionPruningPlugin> plugins;
   private final long txMaxLifetimeMillis;
+  private final long txPruneBufferMillis;
 
   public TransactionPruningRunnable(TransactionManager txManager, Map<String, TransactionPruningPlugin> plugins,
-                                    long txMaxLifetimeMillis) {
+                                    long txMaxLifetimeMillis, long txPruneBufferMillis) {
     this.txManager = txManager;
     this.plugins = plugins;
     this.txMaxLifetimeMillis = txMaxLifetimeMillis;
+    this.txPruneBufferMillis = txPruneBufferMillis;
   }
 
   @Override
@@ -57,8 +60,13 @@ public class TransactionPruningRunnable implements Runnable {
       Transaction tx = txManager.startShort();
       txManager.abort(tx);
 
+      if (tx.getInvalids().length == 0) {
+        LOG.info("Invalid list is empty, not running transaction pruning");
+        return;
+      }
+
       long now = getTime();
-      long inactiveTransactionBound = TxUtils.getInactiveTxBound(now, txMaxLifetimeMillis);
+      long inactiveTransactionBound = TxUtils.getInactiveTxBound(now, txMaxLifetimeMillis + txPruneBufferMillis);
       LOG.info("Starting invalid prune run for time {} and inactive transaction bound {}",
                now, inactiveTransactionBound);
 

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/808ed2e3/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java b/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java
index d80bbd4..8d7fe2f 100644
--- a/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java
+++ b/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java
@@ -78,8 +78,11 @@ public class TransactionPruningService extends AbstractIdleService {
     Map<String, TransactionPruningPlugin> plugins = initializePlugins();
     long txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME,
                                                                      TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME));
+    long txPruneBufferMillis =
+      TimeUnit.SECONDS.toMillis(conf.getLong(TxConstants.TransactionPruning.PRUNE_GRACE_PERIOD,
+                                            TxConstants.TransactionPruning.DEFAULT_PRUNE_GRACE_PERIOD));
     scheduledExecutorService.scheduleAtFixedRate(
-      getTxPruneRunnable(txManager, plugins, txMaxLifetimeMillis),
+      getTxPruneRunnable(txManager, plugins, txMaxLifetimeMillis, txPruneBufferMillis),
       scheduleInterval, scheduleInterval, TimeUnit.SECONDS);
     LOG.info("Scheduled {} plugins with interval {} seconds", plugins.size(), scheduleInterval);
   }
@@ -104,8 +107,8 @@ public class TransactionPruningService extends AbstractIdleService {
   @VisibleForTesting
   TransactionPruningRunnable getTxPruneRunnable(TransactionManager txManager,
                                                 Map<String, TransactionPruningPlugin> plugins,
-                                                long txMaxLifetimeMillis) {
-    return new TransactionPruningRunnable(txManager, plugins, txMaxLifetimeMillis);
+                                                long txMaxLifetimeMillis, long txPruneBufferMillis) {
+    return new TransactionPruningRunnable(txManager, plugins, txMaxLifetimeMillis, txPruneBufferMillis);
   }
 
   private Map<String, TransactionPruningPlugin> initializePlugins()

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/808ed2e3/tephra-core/src/test/java/org/apache/tephra/txprune/TransactionPruningServiceTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/txprune/TransactionPruningServiceTest.java b/tephra-core/src/test/java/org/apache/tephra/txprune/TransactionPruningServiceTest.java
index 9c23ab7..2a0a17e 100644
--- a/tephra-core/src/test/java/org/apache/tephra/txprune/TransactionPruningServiceTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/txprune/TransactionPruningServiceTest.java
@@ -69,6 +69,7 @@ public class TransactionPruningServiceTest {
     conf.setBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, true);
     conf.setInt(TxConstants.TransactionPruning.PRUNE_INTERVAL, 1);
     conf.setInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, 10);
+    conf.setLong(TxConstants.TransactionPruning.PRUNE_GRACE_PERIOD, 0);
 
     // Setup mock data
     long m = 1000;
@@ -132,6 +133,7 @@ public class TransactionPruningServiceTest {
     conf.setBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, true);
     conf.setInt(TxConstants.TransactionPruning.PRUNE_INTERVAL, 1);
     conf.setInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, 10);
+    conf.setLong(TxConstants.TransactionPruning.PRUNE_GRACE_PERIOD, 0);
 
     // Setup mock data
     long m = 1000;
@@ -222,8 +224,8 @@ public class TransactionPruningServiceTest {
     @Override
     TransactionPruningRunnable getTxPruneRunnable(TransactionManager txManager,
                                                   Map<String, TransactionPruningPlugin> plugins,
-                                                  long txMaxLifetimeMillis) {
-      return new TestTransactionPruningRunnable(txManager, plugins, txMaxLifetimeMillis);
+                                                  long txMaxLifetimeMillis, long txPruneBufferMillis) {
+      return new TestTransactionPruningRunnable(txManager, plugins, txMaxLifetimeMillis, txPruneBufferMillis);
     }
   }
 
@@ -233,8 +235,8 @@ public class TransactionPruningServiceTest {
   private static class TestTransactionPruningRunnable extends TransactionPruningRunnable {
     private static Iterator<Long> currentTime;
     TestTransactionPruningRunnable(TransactionManager txManager, Map<String, TransactionPruningPlugin> plugins,
-                                   long txMaxLifetimeMillis) {
-      super(txManager, plugins, txMaxLifetimeMillis);
+                                   long txMaxLifetimeMillis, long txPruneBufferMillis) {
+      super(txManager, plugins, txMaxLifetimeMillis, txPruneBufferMillis);
     }
 
     @Override