You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/11/09 01:32:25 UTC

[1/3] incubator-tephra git commit: Reduce test stdout output due to Travis log output limit of 4 MB

Repository: incubator-tephra
Updated Branches:
  refs/heads/master 99c7bec49 -> f5f682af6


Reduce test stdout output due to Travis log output limit of 4 MB

Signed-off-by: poorna <po...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/f5f682af
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/f5f682af
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/f5f682af

Branch: refs/heads/master
Commit: f5f682af68e7a7884788de5f7bba41d25d80280f
Parents: 2ae7032
Author: poorna <po...@cask.co>
Authored: Mon Nov 7 15:28:48 2016 -0800
Committer: poorna <po...@apache.org>
Committed: Tue Nov 8 17:30:47 2016 -0800

----------------------------------------------------------------------
 .travis.yml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/f5f682af/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 898142d..fb3a64c 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -29,7 +29,7 @@ branches:
     - /^hotfix\/.*$/
     - /^release\/.*$/
 
-script: mvn test -Dsurefire.redirectTestOutputToFile=false
+script: mvn test
 
 sudo: false
 


[3/3] incubator-tephra git commit: Refactor existing test

Posted by po...@apache.org.
Refactor existing test

Signed-off-by: poorna <po...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/e56f6352
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/e56f6352
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/e56f6352

Branch: refs/heads/master
Commit: e56f635231c8281ee656cb53576433491e362742
Parents: 99c7bec
Author: poorna <po...@cask.co>
Authored: Fri Oct 28 17:46:01 2016 -0700
Committer: poorna <po...@apache.org>
Committed: Tue Nov 8 17:30:47 2016 -0800

----------------------------------------------------------------------
 tephra-hbase-compat-1.1-base/pom.xml            |   5 +
 .../tephra/hbase/AbstractHBaseTableTest.java    | 107 +++++++++++++++++++
 .../hbase/TransactionAwareHTableTest.java       |  75 ++-----------
 3 files changed, 123 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e56f6352/tephra-hbase-compat-1.1-base/pom.xml
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/pom.xml b/tephra-hbase-compat-1.1-base/pom.xml
index b0eee6d..b6a58e0 100644
--- a/tephra-hbase-compat-1.1-base/pom.xml
+++ b/tephra-hbase-compat-1.1-base/pom.xml
@@ -28,6 +28,11 @@
   <artifactId>tephra-hbase-compat-1.1-base</artifactId>
   <name>Apache Tephra HBase 1.1 Compatibility Base</name>
 
+  <properties>
+    <hadoop.version>2.5.1</hadoop.version>
+    <hbase11.version>1.1.1</hbase11.version>
+  </properties>
+
   <packaging>pom</packaging>
   <modules>
     <module>tephra-hbase-compat-1.2-cdh</module>

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e56f6352/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java
new file mode 100644
index 0000000..68c43ae
--- /dev/null
+++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ *
+ */
+@SuppressWarnings("WeakerAccess")
+public abstract class AbstractHBaseTableTest {
+  static HBaseTestingUtility testUtil;
+  static HBaseAdmin hBaseAdmin;
+  static Configuration conf;
+
+  @BeforeClass
+  public static void startMiniCluster() throws Exception {
+    testUtil = new HBaseTestingUtility();
+    conf = testUtil.getConfiguration();
+
+    // Tune down the connection thread pool size
+    conf.setInt("hbase.hconnection.threads.core", 5);
+    conf.setInt("hbase.hconnection.threads.max", 10);
+    // Tunn down handler threads in regionserver
+    conf.setInt("hbase.regionserver.handler.count", 10);
+
+    // Set to random port
+    conf.setInt("hbase.master.port", 0);
+    conf.setInt("hbase.master.info.port", 0);
+    conf.setInt("hbase.regionserver.port", 0);
+    conf.setInt("hbase.regionserver.info.port", 0);
+
+    testUtil.startMiniCluster();
+    hBaseAdmin = testUtil.getHBaseAdmin();
+  }
+
+  @AfterClass
+  public static void shutdownMiniCluster() throws Exception {
+    try {
+      if (hBaseAdmin != null) {
+        hBaseAdmin.close();
+      }
+    } finally {
+      testUtil.shutdownMiniCluster();
+    }
+  }
+
+  static HTable createTable(byte[] tableName, byte[][] columnFamilies) throws Exception {
+    return createTable(tableName, columnFamilies, false,
+                       Collections.singletonList(TransactionProcessor.class.getName()));
+  }
+
+  static HTable createTable(byte[] tableName, byte[][] columnFamilies, boolean existingData,
+                             List<String> coprocessors) throws Exception {
+    HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
+    for (byte[] family : columnFamilies) {
+      HColumnDescriptor columnDesc = new HColumnDescriptor(family);
+      columnDesc.setMaxVersions(Integer.MAX_VALUE);
+      columnDesc.setValue(TxConstants.PROPERTY_TTL, String.valueOf(100000)); // in millis
+      desc.addFamily(columnDesc);
+    }
+    if (existingData) {
+      desc.setValue(TxConstants.READ_NON_TX_DATA, "true");
+    }
+    // Divide individually to prevent any overflow
+    int priority  = Coprocessor.PRIORITY_USER;
+    // order in list is the same order that coprocessors will be invoked
+    for (String coprocessor : coprocessors) {
+      desc.addCoprocessor(coprocessor, null, ++priority, null);
+    }
+    hBaseAdmin.createTable(desc);
+    testUtil.waitTableAvailable(tableName, 5000);
+    return new HTable(testUtil.getConfiguration(), tableName);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e56f6352/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
index de1fa6b..c336712 100644
--- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
+++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
@@ -19,21 +19,14 @@ package org.apache.tephra.hbase;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.OperationWithAttributes;
@@ -89,14 +82,11 @@ import static org.junit.Assert.fail;
 /**
  * Tests for TransactionAwareHTables.
  */
-public class TransactionAwareHTableTest {
+public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
   private static final Logger LOG = LoggerFactory.getLogger(TransactionAwareHTableTest.class);
 
-  private static HBaseTestingUtility testUtil;
-  private static HBaseAdmin hBaseAdmin;
-  private static TransactionStateStorage txStateStorage;
-  private static TransactionManager txManager;
-  private static Configuration conf;
+  static TransactionStateStorage txStateStorage;
+  static TransactionManager txManager;
   private TransactionContext transactionContext;
   private TransactionAwareHTable transactionAwareHTable;
   private HTable hTable;
@@ -146,23 +136,6 @@ public class TransactionAwareHTableTest {
 
   @BeforeClass
   public static void setupBeforeClass() throws Exception {
-    testUtil = new HBaseTestingUtility();
-    conf = testUtil.getConfiguration();
-
-    // Tune down the connection thread pool size
-    conf.setInt("hbase.hconnection.threads.core", 5);
-    conf.setInt("hbase.hconnection.threads.max", 10);
-    // Tunn down handler threads in regionserver
-    conf.setInt("hbase.regionserver.handler.count", 10);
-
-    // Set to random port
-    conf.setInt("hbase.master.port", 0);
-    conf.setInt("hbase.master.info.port", 0);
-    conf.setInt("hbase.regionserver.port", 0);
-    conf.setInt("hbase.regionserver.info.port", 0);
-
-    testUtil.startMiniCluster();
-    hBaseAdmin = testUtil.getHBaseAdmin();
     txStateStorage = new InMemoryTransactionStateStorage();
     txManager = new TransactionManager(conf, txStateStorage, new TxMetricsCollector());
     txManager.startAndWait();
@@ -170,8 +143,9 @@ public class TransactionAwareHTableTest {
 
   @AfterClass
   public static void shutdownAfterClass() throws Exception {
-    testUtil.shutdownMiniCluster();
-    hBaseAdmin.close();
+    if (txManager != null) {
+      txManager.stopAndWait();
+    }
   }
 
   @Before
@@ -187,34 +161,6 @@ public class TransactionAwareHTableTest {
     hBaseAdmin.deleteTable(TestBytes.table);
   }
 
-  private HTable createTable(byte[] tableName, byte[][] columnFamilies) throws Exception {
-    return createTable(tableName, columnFamilies, false, Collections.<String>emptyList());
-  }
-
-  private HTable createTable(byte[] tableName, byte[][] columnFamilies, boolean existingData, 
-    List<String> coprocessors) throws Exception {
-    HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
-    for (byte[] family : columnFamilies) {
-      HColumnDescriptor columnDesc = new HColumnDescriptor(family);
-      columnDesc.setMaxVersions(Integer.MAX_VALUE);
-      columnDesc.setValue(TxConstants.PROPERTY_TTL, String.valueOf(100000)); // in millis
-      desc.addFamily(columnDesc);
-    }
-    if (existingData) {
-      desc.setValue(TxConstants.READ_NON_TX_DATA, "true");
-    }
-    // Divide individually to prevent any overflow
-    int priority  = Coprocessor.PRIORITY_USER; 
-    desc.addCoprocessor(TransactionProcessor.class.getName(), null, priority, null);
-    // order in list is the same order that coprocessors will be invoked  
-    for (String coprocessor : coprocessors) {
-      desc.addCoprocessor(coprocessor, null, ++priority, null);
-    }
-    hBaseAdmin.createTable(desc);
-    testUtil.waitTableAvailable(tableName, 5000);
-    return new HTable(testUtil.getConfiguration(), tableName);
-   }
-
   /**
    * Test transactional put and get requests.
    *
@@ -406,7 +352,7 @@ public class TransactionAwareHTableTest {
   public void testAttributesPreserved() throws Exception {
     HTable hTable = createTable(Bytes.toBytes("TestAttributesPreserved"),
         new byte[][]{TestBytes.family, TestBytes.family2}, false,
-        Lists.newArrayList(TestRegionObserver.class.getName()));
+        Lists.newArrayList(TransactionProcessor.class.getName(), TestRegionObserver.class.getName()));
     try {
       TransactionAwareHTable txTable = new TransactionAwareHTable(hTable);
       TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
@@ -1117,7 +1063,7 @@ public class TransactionAwareHTableTest {
 
     TransactionAwareHTable txTable =
       new TransactionAwareHTable(createTable(Bytes.toBytes("testExistingData"), new byte[][]{TestBytes.family}, true, 
-      Collections.<String>emptyList()));
+      Collections.singletonList(TransactionProcessor.class.getName())));
     TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
 
     // Add some pre-existing, non-transactional data
@@ -1266,8 +1212,9 @@ public class TransactionAwareHTableTest {
 
   @Test
   public void testVisibilityAll() throws Exception {
-    HTable nonTxTable = createTable(Bytes.toBytes("testVisibilityAll"),
-      new byte[][]{TestBytes.family, TestBytes.family2}, true, Collections.<String>emptyList());
+    HTable nonTxTable =
+      createTable(Bytes.toBytes("testVisibilityAll"), new byte[][]{TestBytes.family, TestBytes.family2},
+                  true, Collections.singletonList(TransactionProcessor.class.getName()));
     TransactionAwareHTable txTable =
       new TransactionAwareHTable(nonTxTable,
                                  TxConstants.ConflictDetection.ROW); // ROW conflict detection to verify family deletes


[2/3] incubator-tephra git commit: TEPHRA-35 Save compaction state for pruning invalid list

Posted by po...@apache.org.
TEPHRA-35 Save compaction state for pruning invalid list

This closes #19

Signed-off-by: poorna <po...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/2ae70328
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/2ae70328
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/2ae70328

Branch: refs/heads/master
Commit: 2ae70328e0a5581f8b37ea694d8bd955fcdc259d
Parents: e56f635
Author: poorna <po...@cask.co>
Authored: Fri Oct 28 15:12:23 2016 -0700
Committer: poorna <po...@apache.org>
Committed: Tue Nov 8 17:30:47 2016 -0800

----------------------------------------------------------------------
 .../java/org/apache/tephra/TxConstants.java     |  10 +
 .../java/org/apache/tephra/util/TxUtils.java    |  17 ++
 .../org/apache/tephra/util/TxUtilsTest.java     |  22 ++
 .../hbase/coprocessor/TransactionProcessor.java |  33 ++-
 .../coprocessor/janitor/CompactionState.java    |  92 ++++++++
 .../coprocessor/janitor/DataJanitorState.java   |  71 +++++++
 .../tephra/hbase/AbstractHBaseTableTest.java    |   4 +-
 .../tephra/hbase/InvalidListPruneTest.java      | 210 +++++++++++++++++++
 8 files changed, 456 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2ae70328/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
index b9a7929..25451b3 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
@@ -345,4 +345,14 @@ public class TxConstants {
     public static final byte CURRENT_VERSION = 3;
   }
 
+  /**
+   * Configuration for data janitor
+   */
+  public static final class DataJanitor {
+    public static final String PRUNE_ENABLE = "data.tx.prune.enable";
+    public static final String PRUNE_STATE_TABLE = "data.tx.prune.state.table";
+
+    public static final boolean DEFAULT_PRUNE_ENABLE = false;
+    public static final String DEFAULT_PRUNE_STATE_TABLE = "data_tx_janitor_state";
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2ae70328/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java b/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java
index 08b1545..b3d4ace 100644
--- a/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java
+++ b/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java
@@ -149,4 +149,21 @@ public class TxUtils {
   public static boolean isPreExistingVersion(long version) {
     return version < MAX_NON_TX_TIMESTAMP;
   }
+
+  /**
+   * Returns the maximum transaction that can be removed from the invalid list for the state represented by the given
+   * transaction.
+   */
+  public static long getPruneUpperBound(Transaction tx) {
+    // If there are no invalid transactions, and no in-progress transactions then we can prune the invalid list
+    // up to the current read pointer
+    if (tx.getInvalids().length == 0 && tx.getInProgress().length == 0) {
+      return tx.getReadPointer() - 1;
+    }
+
+    long maxInvalidTx =
+      tx.getInvalids().length > 0 ? tx.getInvalids()[tx.getInvalids().length - 1] : Transaction.NO_TX_IN_PROGRESS;
+    long firstInProgress = tx.getFirstInProgress();
+    return Math.min(maxInvalidTx, firstInProgress - 1);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2ae70328/tephra-core/src/test/java/org/apache/tephra/util/TxUtilsTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/util/TxUtilsTest.java b/tephra-core/src/test/java/org/apache/tephra/util/TxUtilsTest.java
index 7743105..db687fe 100644
--- a/tephra-core/src/test/java/org/apache/tephra/util/TxUtilsTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/util/TxUtilsTest.java
@@ -19,6 +19,7 @@
 package org.apache.tephra.util;
 
 import org.apache.tephra.Transaction;
+import org.junit.Assert;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -32,4 +33,25 @@ public class TxUtilsTest {
     // make sure we don't overflow with MAX_VALUE write pointer
     assertEquals(Long.MAX_VALUE, TxUtils.getMaxVisibleTimestamp(Transaction.ALL_VISIBLE_LATEST));
   }
+
+  @Test
+  public void testPruneUpperBound() {
+    Transaction tx = new Transaction(100, 100, new long[] {10, 30}, new long[] {80, 90}, 80);
+    Assert.assertEquals(30, TxUtils.getPruneUpperBound(tx));
+
+    tx = new Transaction(100, 100, new long[] {10, 95}, new long[] {80, 90}, 80);
+    Assert.assertEquals(79, TxUtils.getPruneUpperBound(tx));
+
+    tx = new Transaction(100, 110, new long[] {10}, new long[] {}, Transaction.NO_TX_IN_PROGRESS);
+    Assert.assertEquals(10, TxUtils.getPruneUpperBound(tx));
+
+    tx = new Transaction(100, 110, new long[] {}, new long[] {60}, 60);
+    Assert.assertEquals(59, TxUtils.getPruneUpperBound(tx));
+
+    tx = new Transaction(100, 110, new long[] {}, new long[] {50}, 50);
+    Assert.assertEquals(49, TxUtils.getPruneUpperBound(tx));
+
+    tx = new Transaction(100, 110, new long[] {}, new long[] {}, Transaction.NO_TX_IN_PROGRESS);
+    Assert.assertEquals(99, TxUtils.getPruneUpperBound(tx));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2ae70328/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 14941b3..9f723d6 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
@@ -45,6 +46,7 @@ import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreScanner;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -54,6 +56,7 @@ import org.apache.tephra.TransactionCodec;
 import org.apache.tephra.TxConstants;
 import org.apache.tephra.coprocessor.TransactionStateCache;
 import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
+import org.apache.tephra.hbase.coprocessor.janitor.CompactionState;
 import org.apache.tephra.persist.TransactionVisibilityState;
 import org.apache.tephra.util.TxUtils;
 
@@ -99,6 +102,7 @@ public class TransactionProcessor extends BaseRegionObserver {
 
   private TransactionStateCache cache;
   private final TransactionCodec txCodec;
+  private CompactionState compactionState;
   protected Map<byte[], Long> ttlByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
   protected boolean allowEmptyValues = TxConstants.ALLOW_EMPTY_VALUES_DEFAULT;
   protected boolean readNonTxnData = TxConstants.DEFAULT_READ_NON_TX_DATA;
@@ -138,6 +142,16 @@ public class TransactionProcessor extends BaseRegionObserver {
       if (readNonTxnData) {
         LOG.info("Reading pre-existing data enabled for table " + tableDesc.getNameAsString());
       }
+
+      boolean pruneEnabled = env.getConfiguration().getBoolean(TxConstants.DataJanitor.PRUNE_ENABLE,
+                                                               TxConstants.DataJanitor.DEFAULT_PRUNE_ENABLE);
+      if (pruneEnabled) {
+        String pruneTable = env.getConfiguration().get(TxConstants.DataJanitor.PRUNE_STATE_TABLE,
+                                                       TxConstants.DataJanitor.DEFAULT_PRUNE_STATE_TABLE);
+        compactionState = new CompactionState(env, TableName.valueOf(pruneTable));
+        LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " +
+                    pruneTable);
+      }
     }
   }
 
@@ -268,10 +282,27 @@ public class TransactionProcessor extends BaseRegionObserver {
       List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s,
       CompactionRequest request)
       throws IOException {
-    return createStoreScanner(c.getEnvironment(), "compaction", cache.getLatestState(), store, scanners,
+    // Get the latest tx snapshot state for the compaction
+    TransactionVisibilityState snapshot = cache.getLatestState();
+
+    // Record tx state before the compaction
+    if (compactionState != null) {
+      compactionState.record(request, snapshot);
+    }
+    // Also make sure to use the same snapshot for the compaction
+    return createStoreScanner(c.getEnvironment(), "compaction", snapshot, store, scanners,
                               scanType, earliestPutTs);
   }
 
+  @Override
+  public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, StoreFile resultFile,
+                          CompactionRequest request) throws IOException {
+    // Persist the compaction state after a succesful compaction
+    if (compactionState != null) {
+      compactionState.persist();
+    }
+  }
+
   protected InternalScanner createStoreScanner(RegionCoprocessorEnvironment env, String action,
                                                TransactionVisibilityState snapshot, Store store,
                                                List<? extends KeyValueScanner> scanners, ScanType type,

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2ae70328/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/CompactionState.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/CompactionState.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/CompactionState.java
new file mode 100644
index 0000000..d02456a
--- /dev/null
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/CompactionState.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.coprocessor.janitor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.persist.TransactionVisibilityState;
+import org.apache.tephra.util.TxUtils;
+
+import java.io.IOException;
+import javax.annotation.Nullable;
+
+/**
+ * Record compaction state for invalid list pruning
+ */
+public class CompactionState {
+  private static final Log LOG = LogFactory.getLog(CompactionState.class);
+
+  private final byte[] regionName;
+  private final String regionNameAsString;
+  private final TableName stateTable;
+  private final DataJanitorState dataJanitorState;
+  private volatile long pruneUpperBound = -1;
+
+  public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable) {
+    this.regionName = env.getRegionInfo().getRegionName();
+    this.regionNameAsString = env.getRegionInfo().getRegionNameAsString();
+    this.stateTable = stateTable;
+    this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
+      @Override
+      public Table get() throws IOException {
+        return env.getTable(stateTable);
+      }
+    });
+  }
+
+  /**
+   * Records the transaction state used for a compaction. This method is called when the compaction starts.
+   *
+   * @param request {@link CompactionRequest} for the compaction
+   * @param snapshot transaction state that will be used for the compaction
+   */
+  public void record(CompactionRequest request, @Nullable TransactionVisibilityState snapshot) {
+    if (request.isMajor() && snapshot != null) {
+      Transaction tx = TxUtils.createDummyTransaction(snapshot);
+      pruneUpperBound = TxUtils.getPruneUpperBound(tx);
+      LOG.debug(
+        String.format("Computed prune upper bound %s for compaction request %s using transaction state from time %s",
+                      pruneUpperBound, request, snapshot.getTimestamp()));
+    } else {
+      pruneUpperBound = -1;
+    }
+  }
+
+  /**
+   * Persists the transaction state recorded by {@link #record(CompactionRequest, TransactionVisibilityState)}.
+   * This method is called after the compaction has successfully completed.
+   */
+  public void persist() {
+    if (pruneUpperBound != -1) {
+      try {
+        dataJanitorState.savePruneUpperBound(regionName, pruneUpperBound);
+        LOG.debug(String.format("Saved prune upper bound %s for region %s", pruneUpperBound, regionNameAsString));
+      } catch (IOException e) {
+        LOG.warn(String.format("Cannot record prune upper bound in table %s after compacting region %s",
+                               stateTable, regionNameAsString), e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2ae70328/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java
new file mode 100644
index 0000000..9d4f279
--- /dev/null
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.coprocessor.janitor;
+
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+
+/**
+ * Persist data janitor state into an HBase table.
+ */
+public class DataJanitorState {
+  public static final byte[] FAMILY = {'f'};
+  private static final byte[] PRUNE_UPPER_BOUND_COL = {'u'};
+  private static final byte[] REGION_KEY_PREFIX = {0x1};
+
+  private final TableSupplier stateTableSupplier;
+
+
+  public DataJanitorState(TableSupplier stateTableSupplier) {
+    this.stateTableSupplier = stateTableSupplier;
+  }
+
+  public void savePruneUpperBound(byte[] regionId, long pruneUpperBound) throws IOException {
+    try (Table stateTable = stateTableSupplier.get()) {
+      Put put = new Put(makeRegionKey(regionId));
+      put.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL, Bytes.toBytes(pruneUpperBound));
+      stateTable.put(put);
+    }
+  }
+
+  public long getPruneUpperBound(byte[] regionId) throws IOException {
+    try (Table stateTable = stateTableSupplier.get()) {
+      Get get = new Get(makeRegionKey(regionId));
+      get.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+      byte[] result = stateTable.get(get).getValue(FAMILY, PRUNE_UPPER_BOUND_COL);
+      return result == null ? -1 : Bytes.toLong(result);
+    }
+  }
+
+  private byte[] makeRegionKey(byte[] regionId) {
+    return Bytes.add(REGION_KEY_PREFIX, regionId);
+  }
+
+  /**
+   * Supplies table for persisting state
+   */
+  public interface TableSupplier {
+    Table get() throws IOException;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2ae70328/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java
index 68c43ae..cb8d695 100644
--- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java
+++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java
@@ -36,7 +36,7 @@ import java.util.Collections;
 import java.util.List;
 
 /**
- *
+ * Base class for tests that need a HBase cluster
  */
 @SuppressWarnings("WeakerAccess")
 public abstract class AbstractHBaseTableTest {
@@ -46,7 +46,7 @@ public abstract class AbstractHBaseTableTest {
 
   @BeforeClass
   public static void startMiniCluster() throws Exception {
-    testUtil = new HBaseTestingUtility();
+    testUtil = conf == null ? new HBaseTestingUtility() : new HBaseTestingUtility(conf);
     conf = testUtil.getConfiguration();
 
     // Tune down the connection thread pool size

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2ae70328/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/InvalidListPruneTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/InvalidListPruneTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/InvalidListPruneTest.java
new file mode 100644
index 0000000..ebf58eb
--- /dev/null
+++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/InvalidListPruneTest.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase;
+
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSortedMap;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TransactionContext;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TransactionType;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.coprocessor.TransactionStateCache;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.tephra.hbase.coprocessor.janitor.DataJanitorState;
+import org.apache.tephra.inmemory.InMemoryTxSystemClient;
+import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.persist.InMemoryTransactionStateStorage;
+import org.apache.tephra.persist.TransactionSnapshot;
+import org.apache.tephra.persist.TransactionStateStorage;
+import org.apache.tephra.persist.TransactionVisibilityState;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+
+/**
+ * Test invalid list pruning
+ */
+public class InvalidListPruneTest extends AbstractHBaseTableTest {
+  private static final byte[] family = Bytes.toBytes("f1");
+  private static final byte[] qualifier = Bytes.toBytes("col1");
+
+  private static TableName dataTable;
+  private static TableName pruneStateTable;
+
+  // Override AbstractHBaseTableTest.startMiniCluster to setup configuration
+  @BeforeClass
+  public static void startMiniCluster() throws Exception {
+    // Setup the configuration to start HBase cluster with the invalid list pruning enabled
+    conf = HBaseConfiguration.create();
+    conf.setBoolean(TxConstants.DataJanitor.PRUNE_ENABLE, true);
+    AbstractHBaseTableTest.startMiniCluster();
+
+    TransactionStateStorage txStateStorage = new InMemoryTransactionStateStorage();
+    TransactionManager txManager = new TransactionManager(conf, txStateStorage, new TxMetricsCollector());
+    txManager.startAndWait();
+
+    // Do some transactional data operations
+    dataTable = TableName.valueOf("invalidListPruneTestTable");
+    HTable hTable = createTable(dataTable.getName(), new byte[][]{family}, false,
+                                Collections.singletonList(TestTransactionProcessor.class.getName()));
+    try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, TxConstants.ConflictDetection.ROW)) {
+      TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
+      txContext.start();
+      for (int i = 0; i < 10; ++i) {
+        txTable.put(new Put(Bytes.toBytes(i)).addColumn(family, qualifier, Bytes.toBytes(i)));
+      }
+      txContext.finish();
+    }
+
+    testUtil.flush(dataTable);
+    txManager.stopAndWait();
+
+    pruneStateTable = TableName.valueOf(conf.get(TxConstants.DataJanitor.PRUNE_STATE_TABLE,
+                                                 TxConstants.DataJanitor.DEFAULT_PRUNE_STATE_TABLE));
+  }
+
+  @AfterClass
+  public static void shutdownAfterClass() throws Exception {
+    hBaseAdmin.disableTable(dataTable);
+    hBaseAdmin.deleteTable(dataTable);
+  }
+
+  @Before
+  public void beforeTest() throws Exception {
+    HTable table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY}, false,
+                               // Prune state table is a non-transactional table, hence no transaction co-processor
+                               Collections.<String>emptyList());
+    table.close();
+  }
+
+  @After
+  public void afterTest() throws Exception {
+    hBaseAdmin.disableTable(pruneStateTable);
+    hBaseAdmin.deleteTable(pruneStateTable);
+  }
+
+  @Test
+  public void testRecordCompactionState() throws Exception {
+    DataJanitorState dataJanitorState =
+      new DataJanitorState(new DataJanitorState.TableSupplier() {
+        @Override
+        public Table get() throws IOException {
+          return testUtil.getConnection().getTable(pruneStateTable);
+        }
+      });
+
+    // No prune upper bound initially
+    Assert.assertEquals(-1, dataJanitorState.getPruneUpperBound(getRegionName(dataTable, Bytes.toBytes(0))));
+
+    // Create a new transaction snapshot
+    InMemoryTransactionStateCache.setTransactionSnapshot(
+      new TransactionSnapshot(100, 100, 100, ImmutableSet.of(50L),
+                              ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
+    // Run minor compaction
+    testUtil.compact(dataTable, false);
+    // No prune upper bound after minor compaction too
+    Assert.assertEquals(-1, dataJanitorState.getPruneUpperBound(getRegionName(dataTable, Bytes.toBytes(0))));
+
+    // Run major compaction, and verify prune upper bound
+    testUtil.compact(dataTable, true);
+    Assert.assertEquals(50, dataJanitorState.getPruneUpperBound(getRegionName(dataTable, Bytes.toBytes(0))));
+
+    // Run major compaction again with same snapshot, prune upper bound should not change
+    testUtil.compact(dataTable, true);
+    Assert.assertEquals(50, dataJanitorState.getPruneUpperBound(getRegionName(dataTable, Bytes.toBytes(0))));
+
+    // Create a new transaction snapshot
+    InMemoryTransactionStateCache.setTransactionSnapshot(
+      new TransactionSnapshot(110, 111, 112, ImmutableSet.of(150L),
+                              ImmutableSortedMap.of(
+                                105L, new TransactionManager.InProgressTx(100, 30, TransactionType.SHORT)
+                              )
+    ));
+    Assert.assertEquals(50, dataJanitorState.getPruneUpperBound(getRegionName(dataTable, Bytes.toBytes(0))));
+
+    // Run major compaction again, now prune upper bound should change
+    testUtil.compact(dataTable, true);
+    Assert.assertEquals(104, dataJanitorState.getPruneUpperBound(getRegionName(dataTable, Bytes.toBytes(0))));
+  }
+
+  private byte[] getRegionName(TableName dataTable, byte[] row) throws IOException {
+    HRegionLocation regionLocation =
+      testUtil.getConnection().getRegionLocator(dataTable).getRegionLocation(row);
+    return regionLocation.getRegionInfo().getRegionName();
+  }
+
+  /**
+   * A transaction co-processor that uses in-memory {@link TransactionSnapshot} for testing
+   */
+  @SuppressWarnings("WeakerAccess")
+  public static class TestTransactionProcessor extends TransactionProcessor {
+    @Override
+    protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
+      return new Supplier<TransactionStateCache>() {
+        @Override
+        public TransactionStateCache get() {
+          return new InMemoryTransactionStateCache();
+        }
+      };
+    }
+  }
+
+  /**
+   * Used to supply in-memory {@link TransactionSnapshot} to {@link TestTransactionProcessor} for testing
+   */
+  @SuppressWarnings("WeakerAccess")
+  public static class InMemoryTransactionStateCache extends TransactionStateCache {
+    private static TransactionVisibilityState transactionSnapshot;
+
+    public static void setTransactionSnapshot(TransactionVisibilityState transactionSnapshot) {
+      InMemoryTransactionStateCache.transactionSnapshot = transactionSnapshot;
+    }
+
+    @Override
+    protected void startUp() throws Exception {
+      // Nothing to do
+    }
+
+    @Override
+    protected void shutDown() throws Exception {
+      // Nothing to do
+    }
+
+    @Override
+    public TransactionVisibilityState getLatestState() {
+      return transactionSnapshot;
+    }
+  }
+}