You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by an...@apache.org on 2017/09/06 08:01:16 UTC

[5/5] incubator-tephra git commit: Support for HBase 1.3.x

Support for HBase 1.3.x

This closes #50 from GitHub.

Signed-off-by: anew <an...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/8f958edb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/8f958edb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/8f958edb

Branch: refs/heads/master
Commit: 8f958edb6cbd6078ab0d13ccba14467285800a77
Parents: fd6ef73
Author: anew <an...@apache.org>
Authored: Tue Sep 5 13:44:08 2017 -0700
Committer: anew <an...@apache.org>
Committed: Wed Sep 6 00:56:37 2017 -0700

----------------------------------------------------------------------
 pom.xml                                         |    2 +
 .../tephra/util/ConfigurationFactory.java       |    5 +
 .../org/apache/tephra/util/HBaseVersion.java    |    4 +
 .../util/HBaseVersionSpecificFactory.java       |    7 +-
 .../apache/tephra/util/HBaseVersionTest.java    |    3 +
 tephra-examples/hbase-1.3/pom.xml               |  122 ++
 tephra-examples/pom.xml                         |    1 +
 tephra-hbase-compat-1.3/pom.xml                 |  146 ++
 .../hbase/HBase13ConfigurationProvider.java     |   38 +
 .../tephra/hbase/SecondaryIndexTable.java       |  178 ++
 .../tephra/hbase/TransactionAwareHTable.java    |  699 +++++++
 .../hbase/coprocessor/CellSkipFilter.java       |  138 ++
 .../hbase/coprocessor/TransactionFilters.java   |   62 +
 .../hbase/coprocessor/TransactionProcessor.java |  529 ++++++
 .../TransactionVisibilityFilter.java            |  313 ++++
 .../tephra/hbase/txprune/CompactionState.java   |  112 ++
 .../tephra/hbase/txprune/DataJanitorState.java  |  536 ++++++
 .../txprune/HBaseTransactionPruningPlugin.java  |  373 ++++
 .../hbase/txprune/InvalidListPruningDebug.java  |  294 +++
 .../hbase/txprune/PruneUpperBoundWriter.java    |  164 ++
 .../txprune/PruneUpperBoundWriterSupplier.java  |   55 +
 .../tephra/hbase/txprune/TimeRegions.java       |   85 +
 .../tephra/hbase/AbstractHBaseTableTest.java    |  106 ++
 .../hbase/HBase13ConfigurationProviderTest.java |   35 +
 .../hbase/TransactionAwareHTableTest.java       | 1726 ++++++++++++++++++
 .../hbase/coprocessor/CellSkipFilterTest.java   |  123 ++
 .../coprocessor/TransactionProcessorTest.java   |  624 +++++++
 .../TransactionVisibilityFilterTest.java        |  374 ++++
 .../hbase/txprune/DataJanitorStateTest.java     |  285 +++
 .../hbase/txprune/InvalidListPruneTest.java     |  459 +++++
 .../PruneUpperBoundWriterSupplierTest.java      |  122 ++
 31 files changed, 7719 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 79f7a33..71b54ad 100644
--- a/pom.xml
+++ b/pom.xml
@@ -157,6 +157,7 @@
     <module>tephra-hbase-compat-1.0</module>
     <module>tephra-hbase-compat-1.0-cdh</module>
     <module>tephra-hbase-compat-1.1-base</module>
+    <module>tephra-hbase-compat-1.3</module>
     <module>tephra-examples</module>
     <module>tephra-distribution</module>
   </modules>
@@ -212,6 +213,7 @@
     <hbase11.version>1.1.1</hbase11.version>
     <hbase12cdh.version>1.2.0-cdh5.7.0</hbase12cdh.version>
     <hbase12.version>1.2.0</hbase12.version>
+    <hbase13.version>1.3.1</hbase13.version>
     <junit.version>4.11</junit.version>
     <slf4j.version>1.7.5</slf4j.version>
     <thrift.version>0.9.0</thrift.version>

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-core/src/main/java/org/apache/tephra/util/ConfigurationFactory.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/util/ConfigurationFactory.java b/tephra-core/src/main/java/org/apache/tephra/util/ConfigurationFactory.java
index 2703e60..a2eee8a 100644
--- a/tephra-core/src/main/java/org/apache/tephra/util/ConfigurationFactory.java
+++ b/tephra-core/src/main/java/org/apache/tephra/util/ConfigurationFactory.java
@@ -48,6 +48,11 @@ public class ConfigurationFactory implements Provider<Configuration> {
     }
 
     @Override
+    protected String getHBase13Classname() {
+      return "org.apache.tephra.hbase.HBase13ConfigurationProvider";
+    }
+
+    @Override
     protected String getHBase10CDHClassname() {
       return "org.apache.tephra.hbase.HBase10ConfigurationProvider";
     }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-core/src/main/java/org/apache/tephra/util/HBaseVersion.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/util/HBaseVersion.java b/tephra-core/src/main/java/org/apache/tephra/util/HBaseVersion.java
index e868c6b..814c6f1 100644
--- a/tephra-core/src/main/java/org/apache/tephra/util/HBaseVersion.java
+++ b/tephra-core/src/main/java/org/apache/tephra/util/HBaseVersion.java
@@ -37,6 +37,7 @@ public class HBaseVersion {
   private static final String HBASE_10_VERSION = "1.0";
   private static final String HBASE_11_VERSION = "1.1";
   private static final String HBASE_12_VERSION = "1.2";
+  private static final String HBASE_13_VERSION = "1.3";
   private static final String CDH_CLASSIFIER = "cdh";
 
   private static final Logger LOG = LoggerFactory.getLogger(HBaseVersion.class);
@@ -52,6 +53,7 @@ public class HBaseVersion {
     HBASE_10_CDH("1.0-cdh"),
     HBASE_11("1.1"),
     HBASE_12("1.2"),
+    HBASE_13("1.3"),
     UNKNOWN("unknown");
 
     final String majorVersion;
@@ -89,6 +91,8 @@ public class HBaseVersion {
         currentVersion = Version.HBASE_11;
       } else if (versionString.startsWith(HBASE_12_VERSION)) {
         currentVersion = Version.HBASE_12;
+      } else if (versionString.startsWith(HBASE_13_VERSION)) {
+        currentVersion = Version.HBASE_13;
       } else {
         currentVersion = Version.UNKNOWN;
       }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-core/src/main/java/org/apache/tephra/util/HBaseVersionSpecificFactory.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/util/HBaseVersionSpecificFactory.java b/tephra-core/src/main/java/org/apache/tephra/util/HBaseVersionSpecificFactory.java
index 9153296..ca8db5a 100644
--- a/tephra-core/src/main/java/org/apache/tephra/util/HBaseVersionSpecificFactory.java
+++ b/tephra-core/src/main/java/org/apache/tephra/util/HBaseVersionSpecificFactory.java
@@ -31,7 +31,7 @@ import org.apache.twill.internal.utils.Instances;
 public abstract class HBaseVersionSpecificFactory<T> implements Provider<T> {
   @Override
   public T get() {
-    T instance = null;
+    T instance;
     try {
       switch (HBaseVersion.get()) {
         case HBASE_94:
@@ -52,7 +52,11 @@ public abstract class HBaseVersionSpecificFactory<T> implements Provider<T> {
         case HBASE_12:
           instance = createInstance(getHBase11Classname());
           break;
+        case HBASE_13:
+          instance = createInstance(getHBase13Classname());
+          break;
         case UNKNOWN:
+        default:
           throw new ProvisionException("Unknown HBase version: " + HBaseVersion.getVersionString());
       }
     } catch (ClassNotFoundException cnfe) {
@@ -71,4 +75,5 @@ public abstract class HBaseVersionSpecificFactory<T> implements Provider<T> {
   protected abstract String getHBase10Classname();
   protected abstract String getHBase10CDHClassname();
   protected abstract String getHBase11Classname();
+  protected abstract String getHBase13Classname();
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-core/src/test/java/org/apache/tephra/util/HBaseVersionTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/util/HBaseVersionTest.java b/tephra-core/src/test/java/org/apache/tephra/util/HBaseVersionTest.java
index 27f4032..e114afc 100644
--- a/tephra-core/src/test/java/org/apache/tephra/util/HBaseVersionTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/util/HBaseVersionTest.java
@@ -84,6 +84,9 @@ public class HBaseVersionTest {
 
     ver = HBaseVersion.VersionNumber.create("1.2.1");
     assertVersionNumber(ver, 1, 2, 1, null, false);
+
+    ver = HBaseVersion.VersionNumber.create("1.3.1");
+    assertVersionNumber(ver, 1, 3, 1, null, false);
   }
 
   private void assertVersionNumber(HBaseVersion.VersionNumber version, Integer expectedMajor, Integer expectedMinor,

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-examples/hbase-1.3/pom.xml
----------------------------------------------------------------------
diff --git a/tephra-examples/hbase-1.3/pom.xml b/tephra-examples/hbase-1.3/pom.xml
new file mode 100644
index 0000000..aa9d31b
--- /dev/null
+++ b/tephra-examples/hbase-1.3/pom.xml
@@ -0,0 +1,122 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <groupId>org.apache.tephra</groupId>
+    <artifactId>tephra-examples</artifactId>
+    <version>0.13.0-incubating-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>tephra-examples-hbase-1.3</artifactId>
+  <name>Apache Tephra Examples For HBase 1.3</name>
+
+  <properties>
+    <hadoop.version>2.5.1</hadoop.version>
+    <hbase13.version>1.3.1</hbase13.version>
+  </properties>
+
+  <build>
+    <sourceDirectory>../src/main/java</sourceDirectory>
+    <testSourceDirectory>../src/test/java</testSourceDirectory>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.tephra</groupId>
+      <artifactId>tephra-hbase-compat-1.3</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-common</artifactId>
+      <version>${hbase13.version}</version>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-client</artifactId>
+      <version>${hbase13.version}</version>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-protocol</artifactId>
+      <version>${hbase13.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-server</artifactId>
+      <version>${hbase13.version}</version>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <!-- Test dependencies -->
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-server</artifactId>
+      <version>${hbase13.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-testing-util</artifactId>
+      <version>${hbase13.version}</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>asm</groupId>
+          <artifactId>asm</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-examples/pom.xml
----------------------------------------------------------------------
diff --git a/tephra-examples/pom.xml b/tephra-examples/pom.xml
index b34a6b6..03c99c0 100644
--- a/tephra-examples/pom.xml
+++ b/tephra-examples/pom.xml
@@ -36,6 +36,7 @@
     <module>hbase-1.0-cdh</module>
     <module>hbase-1.1</module>
     <module>hbase-1.2</module>
+    <module>hbase-1.3</module>
     <module>cdh-5.7</module>
     <module>cdh-5.8</module>
   </modules>

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-hbase-compat-1.3/pom.xml
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.3/pom.xml b/tephra-hbase-compat-1.3/pom.xml
new file mode 100644
index 0000000..ef651a7
--- /dev/null
+++ b/tephra-hbase-compat-1.3/pom.xml
@@ -0,0 +1,146 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <groupId>org.apache.tephra</groupId>
+    <artifactId>tephra</artifactId>
+    <version>0.13.0-incubating-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>tephra-hbase-compat-1.3</artifactId>
+  <name>Apache Tephra HBase 1.3 Compatibility</name>
+
+  <properties>
+    <hadoop.version>2.5.1</hadoop.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.tephra</groupId>
+      <artifactId>tephra-api</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tephra</groupId>
+      <artifactId>tephra-core</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-common</artifactId>
+      <version>${hbase13.version}</version>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-client</artifactId>
+      <version>${hbase13.version}</version>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-protocol</artifactId>
+      <version>${hbase13.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-server</artifactId>
+      <version>${hbase13.version}</version>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <!-- Test dependencies -->
+    <dependency>
+      <groupId>org.apache.tephra</groupId>
+      <artifactId>tephra-core</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>log4j-over-slf4j</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>jcl-over-slf4j</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-server</artifactId>
+      <version>${hbase13.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-testing-util</artifactId>
+      <version>${hbase13.version}</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>asm</groupId>
+          <artifactId>asm</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/HBase13ConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/HBase13ConfigurationProvider.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/HBase13ConfigurationProvider.java
new file mode 100644
index 0000000..72efff5
--- /dev/null
+++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/HBase13ConfigurationProvider.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.tephra.util.ConfigurationProvider;
+
+/**
+ * HBase 1.3 version of {@link ConfigurationProvider}.
+ */
+public class HBase13ConfigurationProvider extends ConfigurationProvider {
+  @Override
+  public Configuration get() {
+    return HBaseConfiguration.create();
+  }
+
+  @Override
+  public Configuration get(Configuration baseConf) {
+    return HBaseConfiguration.create(baseConf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/SecondaryIndexTable.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/SecondaryIndexTable.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/SecondaryIndexTable.java
new file mode 100644
index 0000000..8bf8768
--- /dev/null
+++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/SecondaryIndexTable.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tephra.hbase;
+
+import com.google.common.base.Throwables;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TransactionContext;
+import org.apache.tephra.TransactionFailureException;
+import org.apache.tephra.distributed.TransactionServiceClient;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A Transactional SecondaryIndexTable.
+ */
+public class SecondaryIndexTable implements Closeable {
+  private byte[] secondaryIndex;
+  private TransactionAwareHTable transactionAwareHTable;
+  private TransactionAwareHTable secondaryIndexTable;
+  private TransactionContext transactionContext;
+  private final TableName secondaryIndexTableName;
+  private static final byte[] secondaryIndexFamily = Bytes.toBytes("secondaryIndexFamily");
+  private static final byte[] secondaryIndexQualifier = Bytes.toBytes('r');
+  private static final byte[] DELIMITER  = new byte[] {0};
+
+  public SecondaryIndexTable(TransactionServiceClient transactionServiceClient, HTableInterface hTable,
+                             byte[] secondaryIndex) {
+    secondaryIndexTableName = TableName.valueOf(hTable.getName().getNameAsString() + ".idx");
+    HTable secondaryIndexHTable = null;
+    try (HBaseAdmin hBaseAdmin = new HBaseAdmin(hTable.getConfiguration())) {
+      if (!hBaseAdmin.tableExists(secondaryIndexTableName)) {
+        hBaseAdmin.createTable(new HTableDescriptor(secondaryIndexTableName));
+      }
+      secondaryIndexHTable = new HTable(hTable.getConfiguration(), secondaryIndexTableName);
+    } catch (Exception e) {
+      Throwables.propagate(e);
+    }
+
+    this.secondaryIndex = secondaryIndex;
+    this.transactionAwareHTable = new TransactionAwareHTable(hTable);
+    this.secondaryIndexTable = new TransactionAwareHTable(secondaryIndexHTable);
+    this.transactionContext = new TransactionContext(transactionServiceClient, transactionAwareHTable,
+                                                     secondaryIndexTable);
+  }
+
+  public Result get(Get get) throws IOException {
+    return get(Collections.singletonList(get))[0];
+  }
+
+  public Result[] get(List<Get> gets) throws IOException {
+    try {
+      transactionContext.start();
+      Result[] result = transactionAwareHTable.get(gets);
+      transactionContext.finish();
+      return result;
+    } catch (Exception e) {
+      try {
+        transactionContext.abort();
+      } catch (TransactionFailureException e1) {
+        throw new IOException("Could not rollback transaction", e1);
+      }
+    }
+    return null;
+  }
+
+  public Result[] getByIndex(byte[] value) throws IOException {
+    try {
+      transactionContext.start();
+      Scan scan = new Scan(value, Bytes.add(value, new byte[0]));
+      scan.addColumn(secondaryIndexFamily, secondaryIndexQualifier);
+      ResultScanner indexScanner = secondaryIndexTable.getScanner(scan);
+
+      ArrayList<Get> gets = new ArrayList<>();
+      for (Result result : indexScanner) {
+        for (Cell cell : result.listCells()) {
+          gets.add(new Get(cell.getValue()));
+        }
+      }
+      Result[] results = transactionAwareHTable.get(gets);
+      transactionContext.finish();
+      return results;
+    } catch (Exception e) {
+      try {
+        transactionContext.abort();
+      } catch (TransactionFailureException e1) {
+        throw new IOException("Could not rollback transaction", e1);
+      }
+    }
+    return null;
+  }
+
+  public void put(Put put) throws IOException {
+    put(Collections.singletonList(put));
+  }
+
+
+  public void put(List<Put> puts) throws IOException {
+    try {
+      transactionContext.start();
+      ArrayList<Put> secondaryIndexPuts = new ArrayList<>();
+      for (Put put : puts) {
+        List<Put> indexPuts = new ArrayList<>();
+        Set<Map.Entry<byte[], List<KeyValue>>> familyMap = put.getFamilyMap().entrySet();
+        for (Map.Entry<byte [], List<KeyValue>> family : familyMap) {
+          for (KeyValue value : family.getValue()) {
+            if (Bytes.equals(value.getQualifierArray(), value.getQualifierOffset(), value.getQualifierLength(),
+                             secondaryIndex, 0, secondaryIndex.length)) {
+              byte[] secondaryRow = Bytes.add(value.getQualifier(), DELIMITER,
+                                                    Bytes.add(value.getValue(), DELIMITER,
+                                                              value.getRow()));
+              Put indexPut = new Put(secondaryRow);
+              indexPut.add(secondaryIndexFamily, secondaryIndexQualifier, put.getRow());
+              indexPuts.add(indexPut);
+            }
+          }
+        }
+        secondaryIndexPuts.addAll(indexPuts);
+      }
+      transactionAwareHTable.put(puts);
+      secondaryIndexTable.put(secondaryIndexPuts);
+      transactionContext.finish();
+    } catch (Exception e) {
+      try {
+        transactionContext.abort();
+      } catch (TransactionFailureException e1) {
+        throw new IOException("Could not rollback transaction", e1);
+      }
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      transactionAwareHTable.close();
+    } catch (IOException e) {
+      try {
+        secondaryIndexTable.close();
+      } catch (IOException ex) {
+        e.addSuppressed(e);
+      }
+      throw e;
+    }
+    secondaryIndexTable.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
new file mode 100644
index 0000000..ed1dbb1
--- /dev/null
+++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
@@ -0,0 +1,699 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tephra.hbase;
+
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.OperationWithAttributes;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.tephra.AbstractTransactionAwareTable;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionAware;
+import org.apache.tephra.TxConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+
+/**
+ * A Transaction Aware HTable implementation for HBase 1.3. Operations are committed as usual,
+ * but upon a failed or aborted transaction, they are rolled back to the state before the transaction
+ * was started.
+ */
+public class TransactionAwareHTable extends AbstractTransactionAwareTable
+    implements HTableInterface, TransactionAware {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TransactionAwareHTable.class);
+  private final HTableInterface hTable;
+
+  /**
+   * Create a transactional aware instance of the passed HTable
+   *
+   * @param hTable underlying HBase table to use
+   */
+  public TransactionAwareHTable(HTableInterface hTable) {
+    this(hTable, false);
+  }
+
+  /**
+   * Create a transactional aware instance of the passed HTable
+   *
+   * @param hTable underlying HBase table to use
+   * @param conflictLevel level of conflict detection to perform (defaults to {@code COLUMN})
+   */
+  public TransactionAwareHTable(HTableInterface hTable, TxConstants.ConflictDetection conflictLevel) {
+    this(hTable, conflictLevel, false);
+  }
+
+  /**
+   * Create a transactional aware instance of the passed HTable, with the option
+   * of allowing non-transactional operations.
+   * @param hTable underlying HBase table to use
+   * @param allowNonTransactional if true, additional operations (checkAndPut, increment, checkAndDelete)
+   *                              will be available, though non-transactional
+   */
+  public TransactionAwareHTable(HTableInterface hTable, boolean allowNonTransactional) {
+    this(hTable, TxConstants.ConflictDetection.COLUMN, allowNonTransactional);
+  }
+
+  /**
+   * Create a transactional aware instance of the passed HTable, with the option
+   * of allowing non-transactional operations.
+   * @param hTable underlying HBase table to use
+   * @param conflictLevel level of conflict detection to perform (defaults to {@code COLUMN})
+   * @param allowNonTransactional if true, additional operations (checkAndPut, increment, checkAndDelete)
+   *                              will be available, though non-transactional
+   */
+  public TransactionAwareHTable(HTableInterface hTable, TxConstants.ConflictDetection conflictLevel,
+                                boolean allowNonTransactional) {
+    super(conflictLevel, allowNonTransactional);
+    this.hTable = hTable;
+  }
+
+  /* AbstractTransactionAwareTable implementation */
+
+  @Override
+  protected byte[] getTableKey() {
+    return getTableName();
+  }
+
+  @Override
+  protected boolean doCommit() throws IOException {
+    hTable.flushCommits();
+    return true;
+  }
+
+  @Override
+  protected boolean doRollback() throws Exception {
+    try {
+      // pre-size arraylist of deletes
+      int size = 0;
+      for (Set<ActionChange> cs : changeSets.values()) {
+        size += cs.size();
+      }
+      List<Delete> rollbackDeletes = new ArrayList<>(size);
+      for (Map.Entry<Long, Set<ActionChange>> entry : changeSets.entrySet()) {
+        long transactionTimestamp = entry.getKey();
+        for (ActionChange change : entry.getValue()) {
+          byte[] row = change.getRow();
+          byte[] family = change.getFamily();
+          byte[] qualifier = change.getQualifier();
+          Delete rollbackDelete = new Delete(row);
+          makeRollbackOperation(rollbackDelete);
+          switch (conflictLevel) {
+            case ROW:
+            case NONE:
+              // issue family delete for the tx write pointer
+              rollbackDelete.deleteFamilyVersion(change.getFamily(), transactionTimestamp);
+              break;
+            case COLUMN:
+              if (family != null && qualifier == null) {
+                rollbackDelete.deleteFamilyVersion(family, transactionTimestamp);
+              } else if (family != null && qualifier != null) {
+                rollbackDelete.deleteColumn(family, qualifier, transactionTimestamp);
+              }
+              break;
+            default:
+              throw new IllegalStateException("Unknown conflict detection level: " + conflictLevel);
+          }
+          rollbackDeletes.add(rollbackDelete);
+        }
+      }
+      hTable.delete(rollbackDeletes);
+      return true;
+    } finally {
+      try {
+        hTable.flushCommits();
+      } catch (Exception e) {
+        LOG.error("Could not flush HTable commits", e);
+      }
+      tx = null;
+      changeSets.clear();
+    }
+  }
+
+  /* HTableInterface implementation */
+
+  @Override
+  public byte[] getTableName() {
+    return hTable.getTableName();
+  }
+
+  @Override
+  public TableName getName() {
+    return hTable.getName();
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return hTable.getConfiguration();
+  }
+
+  @Override
+  public HTableDescriptor getTableDescriptor() throws IOException {
+    return hTable.getTableDescriptor();
+  }
+
+  @Override
+  public boolean exists(Get get) throws IOException {
+    if (tx == null) {
+      throw new IOException("Transaction not started");
+    }
+    return hTable.exists(transactionalizeAction(get));
+  }
+
+  @Override
+  public Boolean[] exists(List<Get> gets) throws IOException {
+    if (tx == null) {
+      throw new IOException("Transaction not started");
+    }
+    List<Get> transactionalizedGets = new ArrayList<>(gets.size());
+    for (Get get : gets) {
+      transactionalizedGets.add(transactionalizeAction(get));
+    }
+    return hTable.exists(transactionalizedGets);
+  }
+
+  @Override
+  public void batch(List<? extends Row> actions, Object[] results) throws IOException, InterruptedException {
+    if (tx == null) {
+      throw new IOException("Transaction not started");
+    }
+    hTable.batch(transactionalizeActions(actions), results);
+  }
+
+  @Override
+  public Object[] batch(List<? extends Row> actions) throws IOException, InterruptedException {
+    if (tx == null) {
+      throw new IOException("Transaction not started");
+    }
+    return hTable.batch(transactionalizeActions(actions));
+  }
+
+  @Override
+  public <R> void batchCallback(List<? extends Row> actions, Object[] results, Batch.Callback<R> callback) throws
+    IOException, InterruptedException {
+    if (tx == null) {
+      throw new IOException("Transaction not started");
+    }
+    hTable.batchCallback(transactionalizeActions(actions), results, callback);
+  }
+
+  @Override
+  public <R> Object[] batchCallback(List<? extends Row> actions, Batch.Callback<R> callback) throws IOException,
+    InterruptedException {
+    if (tx == null) {
+      throw new IOException("Transaction not started");
+    }
+    return hTable.batchCallback(transactionalizeActions(actions), callback);
+  }
+
+  @Override
+  public Result get(Get get) throws IOException {
+    if (tx == null) {
+      throw new IOException("Transaction not started");
+    }
+    return hTable.get(transactionalizeAction(get));
+  }
+
+  @Override
+  public Result[] get(List<Get> gets) throws IOException {
+    if (tx == null) {
+      throw new IOException("Transaction not started");
+    }
+    ArrayList<Get> transactionalizedGets = new ArrayList<>();
+    for (Get get : gets) {
+      transactionalizedGets.add(transactionalizeAction(get));
+    }
+    return hTable.get(transactionalizedGets);
+  }
+
+  @Override
+  public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
+    if (allowNonTransactional) {
+      return hTable.getRowOrBefore(row, family);
+    } else {
+      throw new UnsupportedOperationException("Operation is not supported transactionally");
+    }
+  }
+
+  @Override
+  public ResultScanner getScanner(Scan scan) throws IOException {
+    if (tx == null) {
+      throw new IOException("Transaction not started");
+    }
+    return hTable.getScanner(transactionalizeAction(scan));
+  }
+
+  @Override
+  public ResultScanner getScanner(byte[] family) throws IOException {
+    if (tx == null) {
+      throw new IOException("Transaction not started");
+    }
+    Scan scan = new Scan();
+    scan.addFamily(family);
+    return hTable.getScanner(transactionalizeAction(scan));
+  }
+
+  @Override
+  public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException {
+    if (tx == null) {
+      throw new IOException("Transaction not started");
+    }
+    Scan scan = new Scan();
+    scan.addColumn(family, qualifier);
+    return hTable.getScanner(transactionalizeAction(scan));
+  }
+
+  @Override
+  public void put(Put put) throws IOException {
+    if (tx == null) {
+      throw new IOException("Transaction not started");
+    }
+    Put txPut = transactionalizeAction(put);
+    hTable.put(txPut);
+  }
+
+  @Override
+  public void put(List<Put> puts) throws IOException {
+    if (tx == null) {
+      throw new IOException("Transaction not started");
+    }
+    List<Put> transactionalizedPuts = new ArrayList<>(puts.size());
+    for (Put put : puts) {
+      Put txPut = transactionalizeAction(put);
+      transactionalizedPuts.add(txPut);
+    }
+    hTable.put(transactionalizedPuts);
+  }
+
+  @Override
+  public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put) throws IOException {
+    if (allowNonTransactional) {
+      return hTable.checkAndPut(row, family, qualifier, value, put);
+    } else {
+      throw new UnsupportedOperationException("Operation is not supported transactionally");
+    }
+  }
+
+  @Override
+  public void delete(Delete delete) throws IOException {
+    if (tx == null) {
+      throw new IOException("Transaction not started");
+    }
+    hTable.delete(transactionalizeAction(delete));
+  }
+
+  @Override
+  public void delete(List<Delete> deletes) throws IOException {
+    if (tx == null) {
+      throw new IOException("Transaction not started");
+    }
+    List<Delete> transactionalizedDeletes = new ArrayList<>(deletes.size());
+    for (Delete delete : deletes) {
+      Delete txDelete = transactionalizeAction(delete);
+      transactionalizedDeletes.add(txDelete);
+    }
+    hTable.delete(transactionalizedDeletes);
+  }
+
+  @Override
+  public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[] value, Delete delete)
+    throws IOException {
+    if (allowNonTransactional) {
+      return hTable.checkAndDelete(row, family, qualifier, value, delete);
+    } else {
+      throw new UnsupportedOperationException("Operation is not supported transactionally");
+    }
+  }
+
+  @Override
+  public boolean checkAndDelete(byte[] bytes, byte[] bytes1, byte[] bytes2, CompareFilter.CompareOp compareOp,
+                                byte[] bytes3, Delete delete) throws IOException {
+    if (allowNonTransactional) {
+      return hTable.checkAndDelete(bytes, bytes1, bytes2, compareOp, bytes3, delete);
+    } else {
+      throw new UnsupportedOperationException("Operation is not supported transactionally");
+    }
+  }
+
+  @Override
+  public boolean checkAndPut(byte[] bytes, byte[] bytes1, byte[] bytes2, CompareFilter.CompareOp compareOp,
+                             byte[] bytes3, Put put) throws IOException {
+    if (allowNonTransactional) {
+      return hTable.checkAndPut(bytes, bytes1, bytes2, compareOp, bytes3, put);
+    } else {
+      throw new UnsupportedOperationException("Operation is not supported transactionally");
+    }
+  }
+
+  @Override
+  public boolean[] existsAll(List<Get> gets) throws IOException {
+    if (tx == null) {
+      throw new IOException("Transaction not started");
+    }
+    List<Get> transactionalizedGets = new ArrayList<>(gets.size());
+    for (Get get : gets) {
+      transactionalizedGets.add(transactionalizeAction(get));
+    }
+    return hTable.existsAll(transactionalizedGets);
+  }
+
+  @Override
+  public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
+                                CompareFilter.CompareOp compareOp, byte[] value, RowMutations rowMutations)
+      throws IOException {
+    if (allowNonTransactional) {
+      return hTable.checkAndMutate(row, family, qualifier, compareOp, value, rowMutations);
+    }
+
+    throw new UnsupportedOperationException("checkAndMutate operation is not supported transactionally");
+  }
+
+  @Override
+  public void setOperationTimeout(int operationTimeout) {
+    hTable.setOperationTimeout(operationTimeout);
+  }
+
+  @Override
+  public int getOperationTimeout() {
+    return hTable.getOperationTimeout();
+  }
+
+  @Override
+  public void setRpcTimeout(int rpcTimeout) {
+    hTable.setRpcTimeout(rpcTimeout);
+  }
+
+  @Override
+  public int getRpcTimeout() {
+    return hTable.getRpcTimeout();
+  }
+
+  @Override
+  public void mutateRow(RowMutations rm) throws IOException {
+    if (tx == null) {
+      throw new IOException("Transaction not started");
+    }
+    RowMutations transactionalMutations = new RowMutations();
+    for (Mutation mutation : rm.getMutations()) {
+      if (mutation instanceof Put) {
+        transactionalMutations.add(transactionalizeAction((Put) mutation));
+      } else if (mutation instanceof Delete) {
+        transactionalMutations.add(transactionalizeAction((Delete) mutation));
+      }
+    }
+    hTable.mutateRow(transactionalMutations);
+  }
+
+  @Override
+  public Result append(Append append) throws IOException {
+    if (allowNonTransactional) {
+      return hTable.append(append);
+    } else {
+      throw new UnsupportedOperationException("Operation is not supported transactionally");
+    }
+  }
+
+  @Override
+  public Result increment(Increment increment) throws IOException {
+    if (allowNonTransactional) {
+      return hTable.increment(increment);
+    } else {
+      throw new UnsupportedOperationException("Operation is not supported transactionally");
+    }
+  }
+
+  @Override
+  public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) throws IOException {
+    if (allowNonTransactional) {
+      return hTable.incrementColumnValue(row, family, qualifier, amount);
+    } else {
+      throw new UnsupportedOperationException("Operation is not supported transactionally");
+    }
+  }
+
+  @Override
+  public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability)
+    throws IOException {
+    if (allowNonTransactional) {
+      return hTable.incrementColumnValue(row, family, qualifier, amount, durability);
+    } else {
+      throw new UnsupportedOperationException("Operation is not supported transactionally");
+    }
+  }
+
+  @Override
+  public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, boolean writeToWAL)
+    throws IOException {
+    if (allowNonTransactional) {
+      return hTable.incrementColumnValue(row, family, qualifier, amount, writeToWAL);
+    } else {
+      throw new UnsupportedOperationException("Operation is not supported transactionally");
+    }
+  }
+
+  @Override
+  public boolean isAutoFlush() {
+    return hTable.isAutoFlush();
+  }
+
+  @Override
+  public void flushCommits() throws IOException {
+    hTable.flushCommits();
+  }
+
+  @Override
+  public void close() throws IOException {
+    hTable.close();
+  }
+
+  @Override
+  public CoprocessorRpcChannel coprocessorService(byte[] row) {
+    return hTable.coprocessorService(row);
+  }
+
+  @Override
+  public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service, byte[] startKey, byte[] endKey,
+                                                                  Batch.Call<T, R> callable)
+    throws ServiceException, Throwable {
+    return hTable.coprocessorService(service, startKey, endKey, callable);
+  }
+
+  @Override
+  public <T extends Service, R> void coprocessorService(Class<T> service, byte[] startKey, byte[] endKey,
+                                                        Batch.Call<T, R> callable, Batch.Callback<R> callback)
+    throws ServiceException, Throwable {
+    hTable.coprocessorService(service, startKey, endKey, callable, callback);
+  }
+
+  @Override
+  public <R extends Message> Map<byte[], R> batchCoprocessorService(
+      MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey,
+      R responsePrototype) throws ServiceException, Throwable {
+    return hTable.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype);
+  }
+
+  @Override
+  public <R extends Message> void batchCoprocessorService(MethodDescriptor methodDescriptor,
+      Message request, byte[] startKey, byte[] endKey, R responsePrototype, Callback<R> callback)
+      throws ServiceException, Throwable {
+    hTable.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype, callback);
+  }
+
+  @Override
+  public void setAutoFlush(boolean autoFlush) {
+    setAutoFlushTo(autoFlush);
+  }
+
+  @Override
+  public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
+    hTable.setAutoFlush(autoFlush, clearBufferOnFail);
+  }
+
+  @Override
+  public void setAutoFlushTo(boolean autoFlush) {
+    hTable.setAutoFlushTo(autoFlush);
+  }
+
+  @Override
+  public long getWriteBufferSize() {
+    return hTable.getWriteBufferSize();
+  }
+
+  @Override
+  public void setWriteBufferSize(long writeBufferSize) throws IOException {
+    hTable.setWriteBufferSize(writeBufferSize);
+  }
+
+  // Helpers to get copies of objects with the timestamp set to the current transaction timestamp.
+
+  private Get transactionalizeAction(Get get) throws IOException {
+    addToOperation(get, tx);
+    return get;
+  }
+
+  private Scan transactionalizeAction(Scan scan) throws IOException {
+    addToOperation(scan, tx);
+    return scan;
+  }
+
+  private Put transactionalizeAction(Put put) throws IOException {
+    Put txPut = new Put(put.getRow(), tx.getWritePointer());
+    Set<Map.Entry<byte[], List<Cell>>> familyMap = put.getFamilyCellMap().entrySet();
+    if (!familyMap.isEmpty()) {
+      for (Map.Entry<byte[], List<Cell>> family : familyMap) {
+        List<Cell> familyValues = family.getValue();
+        if (!familyValues.isEmpty()) {
+          for (Cell value : familyValues) {
+            txPut.add(value.getFamily(), value.getQualifier(), tx.getWritePointer(), value.getValue());
+            addToChangeSet(txPut.getRow(), value.getFamily(), value.getQualifier());
+          }
+        }
+      }
+    }
+    for (Map.Entry<String, byte[]> entry : put.getAttributesMap().entrySet()) {
+      txPut.setAttribute(entry.getKey(), entry.getValue());
+    }
+    txPut.setDurability(put.getDurability());
+    addToOperation(txPut, tx);
+    return txPut;
+  }
+
+  private Delete transactionalizeAction(Delete delete) throws IOException {
+    long transactionTimestamp = tx.getWritePointer();
+
+    byte[] deleteRow = delete.getRow();
+    Delete txDelete = new Delete(deleteRow, transactionTimestamp);
+
+    Map<byte[], List<Cell>> familyToDelete = delete.getFamilyCellMap();
+    if (familyToDelete.isEmpty()) {
+      // perform a row delete if we are using row-level conflict detection
+      if (conflictLevel == TxConstants.ConflictDetection.ROW ||
+        conflictLevel == TxConstants.ConflictDetection.NONE) {
+        // Row delete leaves delete markers in all column families of the table
+        // Therefore get all the column families of the hTable from the HTableDescriptor and add them to the changeSet
+        for (HColumnDescriptor columnDescriptor : hTable.getTableDescriptor().getColumnFamilies()) {
+          // no need to identify individual columns deleted
+          addToChangeSet(deleteRow, columnDescriptor.getName(), null);
+        }
+      } else {
+        Result result = get(new Get(delete.getRow()));
+        // Delete everything
+        NavigableMap<byte[], NavigableMap<byte[], byte[]>> resultMap = result.getNoVersionMap();
+        for (Map.Entry<byte[], NavigableMap<byte[], byte[]>> familyEntry : resultMap.entrySet()) {
+          NavigableMap<byte[], byte[]> familyColumns = result.getFamilyMap(familyEntry.getKey());
+          for (Map.Entry<byte[], byte[]> column : familyColumns.entrySet()) {
+            txDelete.deleteColumns(familyEntry.getKey(), column.getKey(), transactionTimestamp);
+            addToChangeSet(deleteRow, familyEntry.getKey(), column.getKey());
+          }
+        }
+      }
+    } else {
+      for (Map.Entry<byte [], List<Cell>> familyEntry : familyToDelete.entrySet()) {
+        byte[] family = familyEntry.getKey();
+        List<Cell> entries = familyEntry.getValue();
+        boolean isFamilyDelete = false;
+        if (entries.size() == 1) {
+          Cell cell = entries.get(0);
+          isFamilyDelete = CellUtil.isDeleteFamily(cell);
+        }
+        if (isFamilyDelete) {
+          if (conflictLevel == TxConstants.ConflictDetection.ROW ||
+              conflictLevel == TxConstants.ConflictDetection.NONE) {
+            // no need to identify individual columns deleted
+            txDelete.deleteFamily(family);
+            addToChangeSet(deleteRow, family, null);
+          } else {
+            Result result = get(new Get(delete.getRow()).addFamily(family));
+            // Delete entire family
+            NavigableMap<byte[], byte[]> familyColumns = result.getFamilyMap(family);
+            for (Map.Entry<byte[], byte[]> column : familyColumns.entrySet()) {
+              txDelete.deleteColumns(family, column.getKey(), transactionTimestamp);
+              addToChangeSet(deleteRow, family, column.getKey());
+            }
+          }
+        } else {
+          for (Cell value : entries) {
+            txDelete.deleteColumns(value.getFamily(), value.getQualifier(), transactionTimestamp);
+            addToChangeSet(deleteRow, value.getFamily(), value.getQualifier());
+          }
+        }
+      }
+    }
+    for (Map.Entry<String, byte[]> entry : delete.getAttributesMap().entrySet()) {
+        txDelete.setAttribute(entry.getKey(), entry.getValue());
+    }
+    txDelete.setDurability(delete.getDurability());
+    addToOperation(txDelete, tx);
+    return txDelete;
+  }
+
+  private List<? extends Row> transactionalizeActions(List<? extends Row> actions) throws IOException {
+    List<Row> transactionalizedActions = new ArrayList<>(actions.size());
+    for (Row action : actions) {
+      if (action instanceof Get) {
+        transactionalizedActions.add(transactionalizeAction((Get) action));
+      } else if (action instanceof Put) {
+        transactionalizedActions.add(transactionalizeAction((Put) action));
+      } else if (action instanceof Delete) {
+        transactionalizedActions.add(transactionalizeAction((Delete) action));
+      } else {
+        transactionalizedActions.add(action);
+      }
+    }
+    return transactionalizedActions;
+  }
+
+  public void addToOperation(OperationWithAttributes op, Transaction tx) throws IOException {
+    op.setAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY, txCodec.encode(tx));
+  }
+
+  protected void makeRollbackOperation(Delete delete) {
+    delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/CellSkipFilter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/CellSkipFilter.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/CellSkipFilter.java
new file mode 100644
index 0000000..d8664f4
--- /dev/null
+++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/CellSkipFilter.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.coprocessor;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * {@link Filter} that encapsulates another {@link Filter}. It remembers the last {@link KeyValue}
+ * for which the underlying filter returned the {@link ReturnCode#NEXT_COL} or {@link ReturnCode#INCLUDE_AND_NEXT_COL},
+ * so that when {@link #filterKeyValue} is called again for the same {@link KeyValue} with different
+ * version, it returns {@link ReturnCode#NEXT_COL} directly without consulting the underlying {@link Filter}.
+ * Please see TEPHRA-169 for more details.
+ */
+public class CellSkipFilter extends FilterBase {
+  private final Filter filter;
+  // remember the previous keyvalue processed by filter when the return code was NEXT_COL or INCLUDE_AND_NEXT_COL
+  private KeyValue skipColumn = null;
+
+  public CellSkipFilter(Filter filter) {
+    this.filter = filter;
+  }
+
+  /**
+   * Determines whether the current cell should be skipped. The cell will be skipped
+   * if the previous keyvalue had the same key as the current cell. This means filter already responded
+   * for the previous keyvalue with ReturnCode.NEXT_COL or ReturnCode.INCLUDE_AND_NEXT_COL.
+   * @param cell the {@link Cell} to be tested for skipping
+   * @return true is current cell should be skipped, false otherwise
+   */
+  private boolean skipCellVersion(Cell cell) {
+    return skipColumn != null
+      && CellUtil.matchingRow(cell, skipColumn.getRowArray(), skipColumn.getRowOffset(),
+                              skipColumn.getRowLength())
+      && CellUtil.matchingFamily(cell, skipColumn.getFamilyArray(), skipColumn.getFamilyOffset(),
+                                 skipColumn.getFamilyLength())
+      && CellUtil.matchingQualifier(cell, skipColumn.getQualifierArray(), skipColumn.getQualifierOffset(),
+                                    skipColumn.getQualifierLength());
+  }
+
+  @Override
+  public ReturnCode filterKeyValue(Cell cell) throws IOException {
+    if (skipCellVersion(cell)) {
+      return ReturnCode.NEXT_COL;
+    }
+
+    ReturnCode code = filter.filterKeyValue(cell);
+    if (code == ReturnCode.NEXT_COL || code == ReturnCode.INCLUDE_AND_NEXT_COL) {
+      // only store the reference to the keyvalue if we are returning NEXT_COL or INCLUDE_AND_NEXT_COL
+      skipColumn = KeyValueUtil.createFirstOnRow(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
+                                                   cell.getFamilyArray(), cell.getFamilyOffset(),
+                                                   cell.getFamilyLength(), cell.getQualifierArray(),
+                                                   cell.getQualifierOffset(), cell.getQualifierLength());
+    } else {
+      skipColumn = null;
+    }
+    return code;
+  }
+
+  @Override
+  public boolean filterRow() throws IOException {
+    return filter.filterRow();
+  }
+
+  @Override
+  public Cell transformCell(Cell cell) throws IOException {
+    return filter.transformCell(cell);
+  }
+
+  @Override
+  public void reset() throws IOException {
+    filter.reset();
+  }
+
+  @Override
+  public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException {
+    return filter.filterRowKey(buffer, offset, length);
+  }
+
+  @Override
+  public boolean filterAllRemaining() throws IOException {
+    return filter.filterAllRemaining();
+  }
+
+  @Override
+  public void filterRowCells(List<Cell> kvs) throws IOException {
+    filter.filterRowCells(kvs);
+  }
+
+  @Override
+  public boolean hasFilterRow() {
+    return filter.hasFilterRow();
+  }
+
+  @SuppressWarnings("deprecation")
+  @Override
+  public KeyValue getNextKeyHint(KeyValue currentKV) throws IOException {
+    return filter.getNextKeyHint(currentKV);
+  }
+
+  @Override
+  public Cell getNextCellHint(Cell currentKV) throws IOException {
+    return filter.getNextCellHint(currentKV);
+  }
+
+  @Override
+  public boolean isFamilyEssential(byte[] name) throws IOException {
+    return filter.isFamilyEssential(name);
+  }
+
+  @Override
+  public byte[] toByteArray() throws IOException {
+    return filter.toByteArray();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java
new file mode 100644
index 0000000..0ca9f9c
--- /dev/null
+++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.coprocessor;
+
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.tephra.Transaction;
+
+import java.util.Map;
+import javax.annotation.Nullable;
+
+/**
+ * Factory class for providing {@link Filter} instances.
+ */
+public class TransactionFilters {
+  /**
+   * Creates a new {@link org.apache.hadoop.hbase.filter.Filter} for returning data only from visible transactions.
+   *
+   * @param tx the current transaction to apply.  Only data visible to this transaction will be returned.
+   * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
+   * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
+   *                         these will be interpreted as "delete" markers and the column will be filtered out
+   * @param scanType the type of scan operation being performed
+   */
+  public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
+                                           ScanType scanType) {
+    return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, null));
+  }
+
+  /**
+   * Creates a new {@link org.apache.hadoop.hbase.filter.Filter} for returning data only from visible transactions.
+   *
+   * @param tx the current transaction to apply.  Only data visible to this transaction will be returned.
+   * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
+   * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
+   *                         these will be interpreted as "delete" markers and the column will be filtered out
+   * @param scanType the type of scan operation being performed
+   * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by
+   *                   calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}.  If null, then
+   *                   {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead.
+   */
+  public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
+                                           ScanType scanType, @Nullable Filter cellFilter) {
+    return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
new file mode 100644
index 0000000..553f598
--- /dev/null
+++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.coprocessor;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.OperationWithAttributes;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreScanner;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionCodec;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.coprocessor.CacheSupplier;
+import org.apache.tephra.coprocessor.TransactionStateCache;
+import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
+import org.apache.tephra.hbase.txprune.CompactionState;
+import org.apache.tephra.persist.TransactionVisibilityState;
+import org.apache.tephra.util.TxUtils;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+
+/**
+ * {@code org.apache.hadoop.hbase.coprocessor.RegionObserver} coprocessor that handles server-side processing
+ * for transactions:
+ * <ul>
+ *   <li>applies filtering to exclude data from invalid and in-progress transactions</li>
+ *   <li>overrides the scanner returned for flush and compaction to drop data written by invalidated transactions,
+ *   or expired due to TTL.</li>
+ * </ul>
+ *
+ * <p>In order to use this coprocessor for transactions, configure the class on any table involved in transactions,
+ * or on all user tables by adding the following to hbase-site.xml:
+ * {@code
+ * <property>
+ *   <name>hbase.coprocessor.region.classes</name>
+ *   <value>org.apache.tephra.hbase.coprocessor.TransactionProcessor</value>
+ * </property>
+ * }
+ * </p>
+ *
+ * <p>HBase {@code Get} and {@code Scan} operations should have the current transaction serialized on to the operation
+ * as an attribute:
+ * {@code
+ * Transaction t = ...;
+ * Get get = new Get(...);
+ * TransactionCodec codec = new TransactionCodec();
+ * codec.addToOperation(get, t);
+ * }
+ * </p>
+ */
+public class TransactionProcessor extends BaseRegionObserver {
+  private static final Log LOG = LogFactory.getLog(TransactionProcessor.class);
+
+  private final TransactionCodec txCodec;
+  private TransactionStateCache cache;
+  private volatile CompactionState compactionState;
+  private CacheSupplier<TransactionStateCache> cacheSupplier;
+
+  protected volatile Boolean pruneEnable;
+  protected volatile Long txMaxLifetimeMillis;
+  protected Map<byte[], Long> ttlByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+  protected boolean allowEmptyValues = TxConstants.ALLOW_EMPTY_VALUES_DEFAULT;
+  protected boolean readNonTxnData = TxConstants.DEFAULT_READ_NON_TX_DATA;
+
+  public TransactionProcessor() {
+    this.txCodec = new TransactionCodec();
+  }
+
+  /* RegionObserver implementation */
+
+  @Override
+  public void start(CoprocessorEnvironment e) throws IOException {
+    if (e instanceof RegionCoprocessorEnvironment) {
+      RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e;
+      this.cacheSupplier = getTransactionStateCacheSupplier(env);
+      this.cache = cacheSupplier.get();
+
+      HTableDescriptor tableDesc = env.getRegion().getTableDesc();
+      for (HColumnDescriptor columnDesc : tableDesc.getFamilies()) {
+        String columnTTL = columnDesc.getValue(TxConstants.PROPERTY_TTL);
+        long ttl = 0;
+        if (columnTTL != null) {
+          try {
+            ttl = Long.parseLong(columnTTL);
+            LOG.info("Family " + columnDesc.getNameAsString() + " has TTL of " + columnTTL);
+          } catch (NumberFormatException nfe) {
+            LOG.warn("Invalid TTL value configured for column family " + columnDesc.getNameAsString() +
+                       ", value = " + columnTTL);
+          }
+        }
+        ttlByFamily.put(columnDesc.getName(), ttl);
+      }
+
+      this.allowEmptyValues = getAllowEmptyValues(env, tableDesc);
+      this.txMaxLifetimeMillis = getTxMaxLifetimeMillis(env);
+      this.readNonTxnData = Boolean.valueOf(tableDesc.getValue(TxConstants.READ_NON_TX_DATA));
+      if (readNonTxnData) {
+        LOG.info("Reading pre-existing data enabled for table " + tableDesc.getNameAsString());
+      }
+      initializePruneState(env);
+    }
+  }
+
+  /**
+   * Fetch the {@link Configuration} that contains the properties required by the coprocessor. By default,
+   * the HBase configuration is returned. This method will never return {@code null} in Tephra but the derived
+   * classes might do so if {@link Configuration} is not available temporarily (for example, if it is being fetched
+   * from a HBase Table.
+   *
+   * @param env {@link RegionCoprocessorEnvironment} of the Region to which the coprocessor is associated
+   * @return {@link Configuration}, can be null if it is not available
+   */
+  @Nullable
+  protected Configuration getConfiguration(CoprocessorEnvironment env) {
+    return env.getConfiguration();
+  }
+
+  protected CacheSupplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
+    return new TransactionStateCacheSupplier(env.getConfiguration());
+  }
+
+  @Override
+  public void stop(CoprocessorEnvironment e) throws IOException {
+    try {
+      resetPruneState();
+    } finally {
+      if (cacheSupplier != null) {
+        cacheSupplier.release();
+      }
+    }
+  }
+
+  @Override
+  public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results)
+    throws IOException {
+    Transaction tx = getFromOperation(get);
+    if (tx != null) {
+      projectFamilyDeletes(get);
+      get.setMaxVersions();
+      get.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttlByFamily, tx, readNonTxnData),
+                       TxUtils.getMaxVisibleTimestamp(tx));
+      Filter newFilter = getTransactionFilter(tx, ScanType.USER_SCAN, get.getFilter());
+      get.setFilter(newFilter);
+    }
+  }
+
+  @Override
+  public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability)
+    throws IOException {
+    Transaction tx = getFromOperation(put);
+    ensureValidTxLifetime(e.getEnvironment(), put, tx);
+  }
+
+  @Override
+  public void preDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit,
+                        Durability durability) throws IOException {
+    // Translate deletes into our own delete tombstones
+    // Since HBase deletes cannot be undone, we need to translate deletes into special puts, which allows
+    // us to rollback the changes (by a real delete) if the transaction fails
+
+    // Deletes that are part of a transaction rollback do not need special handling.
+    // They will never be rolled back, so are performed as normal HBase deletes.
+    if (isRollbackOperation(delete)) {
+      return;
+    }
+
+    Transaction tx = getFromOperation(delete);
+    ensureValidTxLifetime(e.getEnvironment(), delete, tx);
+
+    // Other deletes are client-initiated and need to be translated into our own tombstones
+    // TODO: this should delegate to the DeleteStrategy implementation.
+    Put deleteMarkers = new Put(delete.getRow(), delete.getTimeStamp());
+    for (byte[] family : delete.getFamilyCellMap().keySet()) {
+      List<Cell> familyCells = delete.getFamilyCellMap().get(family);
+      if (isFamilyDelete(familyCells)) {
+        deleteMarkers.add(family, TxConstants.FAMILY_DELETE_QUALIFIER, familyCells.get(0).getTimestamp(),
+                          HConstants.EMPTY_BYTE_ARRAY);
+      } else {
+        for (Cell cell : familyCells) {
+          deleteMarkers.add(family, CellUtil.cloneQualifier(cell), cell.getTimestamp(),
+                            HConstants.EMPTY_BYTE_ARRAY);
+        }
+      }
+    }
+    for (Map.Entry<String, byte[]> entry : delete.getAttributesMap().entrySet()) {
+        deleteMarkers.setAttribute(entry.getKey(), entry.getValue());
+    }
+    e.getEnvironment().getRegion().put(deleteMarkers);
+    // skip normal delete handling
+    e.bypass();
+  }
+
+  private boolean getAllowEmptyValues(RegionCoprocessorEnvironment env, HTableDescriptor htd) {
+    String allowEmptyValuesFromTableDesc = htd.getValue(TxConstants.ALLOW_EMPTY_VALUES_KEY);
+    Configuration conf = getConfiguration(env);
+    boolean allowEmptyValuesFromConfig = (conf != null) ?
+      conf.getBoolean(TxConstants.ALLOW_EMPTY_VALUES_KEY, TxConstants.ALLOW_EMPTY_VALUES_DEFAULT) :
+      TxConstants.ALLOW_EMPTY_VALUES_DEFAULT;
+
+    // If the property is not present in the tableDescriptor, get it from the Configuration
+    return  (allowEmptyValuesFromTableDesc != null) ?
+      Boolean.valueOf(allowEmptyValuesFromTableDesc) : allowEmptyValuesFromConfig;
+  }
+
+  private long getTxMaxLifetimeMillis(RegionCoprocessorEnvironment env) {
+    Configuration conf = getConfiguration(env);
+    if (conf != null) {
+      return TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME,
+                                                   TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME));
+    }
+    return TimeUnit.SECONDS.toMillis(TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME);
+  }
+
+  private boolean isFamilyDelete(List<Cell> familyCells) {
+    return familyCells.size() == 1 && CellUtil.isDeleteFamily(familyCells.get(0));
+  }
+
+  @Override
+  public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, RegionScanner s)
+    throws IOException {
+    Transaction tx = getFromOperation(scan);
+    if (tx != null) {
+      projectFamilyDeletes(scan);
+      scan.setMaxVersions();
+      scan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttlByFamily, tx, readNonTxnData),
+                        TxUtils.getMaxVisibleTimestamp(tx));
+      Filter newFilter = getTransactionFilter(tx, ScanType.USER_SCAN, scan.getFilter());
+      scan.setFilter(newFilter);
+    }
+    return s;
+  }
+
+  /**
+   * Ensures that family delete markers are present in the columns requested for any scan operation.
+   * @param scan The original scan request
+   * @return The modified scan request with the family delete qualifiers represented
+   */
+  private Scan projectFamilyDeletes(Scan scan) {
+    for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
+      NavigableSet<byte[]> columns = entry.getValue();
+      // wildcard scans will automatically include the delete marker, so only need to add it when we have
+      // explicit columns listed
+      if (columns != null && !columns.isEmpty()) {
+        scan.addColumn(entry.getKey(), TxConstants.FAMILY_DELETE_QUALIFIER);
+      }
+    }
+    return scan;
+  }
+
+  /**
+   * Ensures that family delete markers are present in the columns requested for any get operation.
+   * @param get The original get request
+   * @return The modified get request with the family delete qualifiers represented
+   */
+  private Get projectFamilyDeletes(Get get) {
+    for (Map.Entry<byte[], NavigableSet<byte[]>> entry : get.getFamilyMap().entrySet()) {
+      NavigableSet<byte[]> columns = entry.getValue();
+      // wildcard scans will automatically include the delete marker, so only need to add it when we have
+      // explicit columns listed
+      if (columns != null && !columns.isEmpty()) {
+        get.addColumn(entry.getKey(), TxConstants.FAMILY_DELETE_QUALIFIER);
+      }
+    }
+    return get;
+  }
+
+  @Override
+  public InternalScanner preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+                                             KeyValueScanner memstoreScanner, InternalScanner scanner)
+      throws IOException {
+    return createStoreScanner(c.getEnvironment(), "flush", cache.getLatestState(), store,
+                              Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES,
+                              HConstants.OLDEST_TIMESTAMP);
+  }
+
+  @Override
+  public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
+    // Record whether the region is empty after a flush
+    Region region = e.getEnvironment().getRegion();
+    // After a flush, if the memstore size is zero and there are no store files for any stores in the region
+    // then the region must be empty
+    long numStoreFiles = numStoreFilesForRegion(e);
+    long memstoreSize = region.getMemstoreSize();
+    LOG.debug(String.format("Region %s: memstore size = %s, num store files = %s",
+                            region.getRegionInfo().getRegionNameAsString(), memstoreSize, numStoreFiles));
+    if (memstoreSize == 0 && numStoreFiles == 0) {
+      if (compactionState != null) {
+        compactionState.persistRegionEmpty(System.currentTimeMillis());
+      }
+    }
+  }
+
+  @Override
+  public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+      List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s,
+      CompactionRequest request)
+      throws IOException {
+    // Get the latest tx snapshot state for the compaction
+    TransactionVisibilityState snapshot = cache.getLatestState();
+
+    // Record tx state before the compaction
+    if (compactionState != null) {
+      compactionState.record(request, snapshot);
+    }
+
+    // Also make sure to use the same snapshot for the compaction
+    return createStoreScanner(c.getEnvironment(), "compaction", snapshot, store, scanners, scanType, earliestPutTs);
+  }
+
+  @Override
+  public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, StoreFile resultFile,
+                          CompactionRequest request) throws IOException {
+    // Persist the compaction state after a successful compaction
+    if (compactionState != null) {
+      compactionState.persist();
+    }
+  }
+
+  protected InternalScanner createStoreScanner(RegionCoprocessorEnvironment env, String action,
+                                               TransactionVisibilityState snapshot, Store store,
+                                               List<? extends KeyValueScanner> scanners, ScanType type,
+                                               long earliestPutTs) throws IOException {
+    if (snapshot == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Region " + env.getRegion().getRegionInfo().getRegionNameAsString() +
+                    ", no current transaction state found, defaulting to normal " + action + " scanner");
+      }
+      return null;
+    }
+
+    // construct a dummy transaction from the latest snapshot
+    Transaction dummyTx = TxUtils.createDummyTransaction(snapshot);
+    Scan scan = new Scan();
+    // need to see all versions, since we filter out excludes and applications may rely on multiple versions
+    scan.setMaxVersions();
+    scan.setFilter(
+        new IncludeInProgressFilter(dummyTx.getVisibilityUpperBound(),
+            snapshot.getInvalid(),
+            getTransactionFilter(dummyTx, type, null)));
+
+    return new StoreScanner(store, store.getScanInfo(), scan, scanners,
+                            type, store.getSmallestReadPoint(), earliestPutTs);
+  }
+
+  private Transaction getFromOperation(OperationWithAttributes op) throws IOException {
+    byte[] encoded = op.getAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY);
+    if (encoded == null) {
+      // to support old clients
+      encoded = op.getAttribute(TxConstants.OLD_TX_OPERATION_ATTRIBUTE_KEY);
+    }
+    if (encoded != null) {
+      return txCodec.decode(encoded);
+    }
+    return null;
+  }
+
+  /**
+   * Make sure that the transaction is within the max valid transaction lifetime.
+   *
+   * @param env {@link RegionCoprocessorEnvironment} of the Region to which the coprocessor is associated
+   * @param op {@link OperationWithAttributes} HBase operation to access its attributes if required
+   * @param tx {@link Transaction} supplied by the
+   * @throws DoNotRetryIOException thrown if the transaction is older than the max lifetime of a transaction
+   *         IOException throw if the value of max lifetime of transaction is unavailable
+   */
+  protected void ensureValidTxLifetime(RegionCoprocessorEnvironment env,
+                                       @SuppressWarnings("unused") OperationWithAttributes op,
+                                       @Nullable Transaction tx) throws IOException {
+    if (tx == null) {
+      return;
+    }
+
+    boolean validLifetime =
+      (TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis) > System.currentTimeMillis();
+    if (!validLifetime) {
+      throw new DoNotRetryIOException(String.format("Transaction %s has exceeded max lifetime %s ms",
+                                                    tx.getTransactionId(), txMaxLifetimeMillis));
+    }
+  }
+
+  private boolean isRollbackOperation(OperationWithAttributes op) throws IOException {
+    return op.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null ||
+      // to support old clients
+      op.getAttribute(TxConstants.OLD_TX_ROLLBACK_ATTRIBUTE_KEY) != null;
+  }
+
+  /**
+   * Derived classes can override this method to customize the filter used to return data visible for the current
+   * transaction.
+   *
+   * @param tx the current transaction to apply
+   * @param type the type of scan being performed
+   */
+  protected Filter getTransactionFilter(Transaction tx, ScanType type, Filter filter) {
+    return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter);
+  }
+
+  /**
+   * Refresh the properties related to transaction pruning. This method needs to be invoked if there is change in the
+   * prune related properties after clearing the state by calling {@link #resetPruneState}.
+   *
+   * @param env {@link RegionCoprocessorEnvironment} of this region
+   */
+  protected void initializePruneState(RegionCoprocessorEnvironment env) {
+    Configuration conf = getConfiguration(env);
+    if (conf != null) {
+      pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
+                                    TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
+
+      if (Boolean.TRUE.equals(pruneEnable)) {
+        TableName pruneTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+                                                          TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
+        long pruneFlushInterval = TimeUnit.SECONDS.toMillis(conf.getLong(
+          TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
+          TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
+
+        compactionState = new CompactionState(env, pruneTable, pruneFlushInterval);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(String.format("Automatic invalid list pruning is enabled for table %s. Compaction state " +
+                                    "will be recorded in table %s",
+                                  env.getRegionInfo().getTable().getNameWithNamespaceInclAsString(),
+                                  pruneTable.getNameWithNamespaceInclAsString()));
+        }
+      }
+    }
+  }
+
+  /**
+   * Stop and clear state related to pruning.
+   */
+  protected void resetPruneState() {
+    pruneEnable = false;
+    if (compactionState != null) {
+      compactionState.stop();
+      compactionState = null;
+    }
+  }
+
+  private long numStoreFilesForRegion(ObserverContext<RegionCoprocessorEnvironment> c) {
+    long numStoreFiles = 0;
+    for (Store store : c.getEnvironment().getRegion().getStores()) {
+      numStoreFiles += store.getStorefiles().size();
+    }
+    return numStoreFiles;
+  }
+
+  /**
+   * Filter used to include cells visible to in-progress transactions on flush and commit.
+   */
+  static class IncludeInProgressFilter extends FilterBase {
+    private final long visibilityUpperBound;
+    private final Set<Long> invalidIds;
+    private final Filter txFilter;
+
+    public IncludeInProgressFilter(long upperBound, Collection<Long> invalids, Filter transactionFilter) {
+      this.visibilityUpperBound = upperBound;
+      this.invalidIds = Sets.newHashSet(invalids);
+      this.txFilter = transactionFilter;
+    }
+
+    @Override
+    public ReturnCode filterKeyValue(Cell cell) throws IOException {
+      // include all cells visible to in-progress transactions, except for those already marked as invalid
+      long ts = cell.getTimestamp();
+      if (ts > visibilityUpperBound) {
+        // include everything that could still be in-progress except invalids
+        if (invalidIds.contains(ts)) {
+          return ReturnCode.SKIP;
+        }
+        return ReturnCode.INCLUDE;
+      }
+      return txFilter.filterKeyValue(cell);
+    }
+  }
+}