You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pe...@apache.org on 2016/05/25 17:16:20 UTC

[1/3] falcon git commit: FALCON-1858 Support HBase as a storage backend for Falcon Titan graphDB

Repository: falcon
Updated Branches:
  refs/heads/master d37872e81 -> 9edf9e524


http://git-wip-us.apache.org/repos/asf/falcon/blob/9edf9e52/titan/src/test/java/com/thinkaurelius/titan/diskstorage/locking/LocalLockMediatorTest.java
----------------------------------------------------------------------
diff --git a/titan/src/test/java/com/thinkaurelius/titan/diskstorage/locking/LocalLockMediatorTest.java b/titan/src/test/java/com/thinkaurelius/titan/diskstorage/locking/LocalLockMediatorTest.java
new file mode 100644
index 0000000..d0fd401
--- /dev/null
+++ b/titan/src/test/java/com/thinkaurelius/titan/diskstorage/locking/LocalLockMediatorTest.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.locking;
+
+import com.thinkaurelius.titan.diskstorage.hbase.HBaseTransaction;
+import com.thinkaurelius.titan.diskstorage.util.time.TimestampProvider;
+import com.thinkaurelius.titan.diskstorage.util.time.Timestamps;
+import com.thinkaurelius.titan.diskstorage.StaticBuffer;
+import com.thinkaurelius.titan.diskstorage.util.KeyColumn;
+import com.thinkaurelius.titan.diskstorage.util.StaticArrayBuffer;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.TimeUnit;
+
+public class LocalLockMediatorTest {
+
+    private static final String LOCK_NAMESPACE = "test";
+    private static final StaticBuffer LOCK_ROW = StaticArrayBuffer.of(new byte[]{1});
+    private static final StaticBuffer LOCK_COL = StaticArrayBuffer.of(new byte[]{1});
+    private static final KeyColumn kc = new KeyColumn(LOCK_ROW, LOCK_COL);
+    private static final HBaseTransaction mockTx1 = Mockito.mock(HBaseTransaction.class);
+    private static final HBaseTransaction mockTx2 = Mockito.mock(HBaseTransaction.class);
+
+    @Test
+    public void testLock() throws InterruptedException {
+        TimestampProvider times = Timestamps.MICRO;
+        LocalLockMediator<HBaseTransaction> llm =
+            new LocalLockMediator<HBaseTransaction>(LOCK_NAMESPACE, times);
+
+        //Expire immediately
+        Assert.assertTrue(llm.lock(kc, mockTx1, times.getTime(0, TimeUnit.NANOSECONDS)));
+        Assert.assertTrue(llm.lock(kc, mockTx2, times.getTime(Long.MAX_VALUE, TimeUnit.NANOSECONDS)));
+
+        llm = new LocalLockMediator<HBaseTransaction>(LOCK_NAMESPACE, times);
+
+        //Expire later
+        Assert.assertTrue(llm.lock(kc, mockTx1, times.getTime(Long.MAX_VALUE, TimeUnit.NANOSECONDS)));
+        //So second lock should fail on same keyCol
+        Assert.assertFalse(llm.lock(kc, mockTx2, times.getTime(Long.MAX_VALUE, TimeUnit.NANOSECONDS)));
+
+        //Unlock
+        Assert.assertTrue(llm.unlock(kc, mockTx1));
+        //Now locking should succeed
+        Assert.assertTrue(llm.lock(kc, mockTx2, times.getTime(Long.MAX_VALUE, TimeUnit.NANOSECONDS)));
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/9edf9e52/webapp/pom.xml
----------------------------------------------------------------------
diff --git a/webapp/pom.xml b/webapp/pom.xml
index 67c4acf..f98c5e3 100644
--- a/webapp/pom.xml
+++ b/webapp/pom.xml
@@ -294,6 +294,13 @@
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.falcon</groupId>
+            <artifactId>falcon-titan</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
     </dependencies>
 
     <build>


[3/3] falcon git commit: FALCON-1858 Support HBase as a storage backend for Falcon Titan graphDB

Posted by pe...@apache.org.
FALCON-1858 Support HBase as a storage backend for Falcon Titan graphDB

This patch provides the following features
1.   Ability to build Falcon with storage backend plugins without the artifacts (hbase and bdb).
2.   Ability to package a standalone HBase if desired
3.  Backport of Titan 1.0 Hbase 1.0 storage backend into Titan 0.5.4 (shaded classes - mostly similar to Apache Atlas usage of the same) (most of the addons/titan codebase)
4.  Default build profile is with bdb backend with bdb artifacts also included in the build.

Tested the HBase backend with 1.1.5 and 1.2.1 Hbase environments.   Ran test-patch successfully with enabling/disabling of bdb db artifacts and also with all potential addons enabled

Author: Venkat Ranganathan <ve...@hortonworks.com>

Reviewers: Peeyush <pe...@apache.org>, Balu <ba...@apache.org>

Closes #143 from vrangan/FALCON-1858 and squashes the following commits:

693eb05 [Venkat Ranganathan] Disabled hbase also by default and added a comment in startup properties
0b2d8e0 [Venkat Ranganathan] Merged with master with latest changes
7962570 [Venkat Ranganathan] FALCON-1858: Support HBase as a storage backend for Falcon Titan graphDB
a434564 [Venkat Ranganathan] FALCON-1976 :  Remove hadoop-2 profile


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/9edf9e52
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/9edf9e52
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/9edf9e52

Branch: refs/heads/master
Commit: 9edf9e52482f1591955fe5b44d6cab666e0b2e46
Parents: d37872e
Author: Venkat Ranganathan <ve...@hortonworks.com>
Authored: Wed May 25 22:45:50 2016 +0530
Committer: peeyush b <pb...@hortonworks.com>
Committed: Wed May 25 22:45:50 2016 +0530

----------------------------------------------------------------------
 common/pom.xml                                  |  25 +-
 distro/pom.xml                                  | 206 +++--
 docs/src/site/twiki/Configuration.twiki         |  63 ++
 docs/src/site/twiki/InstallationSteps.twiki     |   8 +-
 pom.xml                                         | 207 ++++-
 prism/pom.xml                                   |  27 +
 src/bin/package.sh                              |   2 +-
 src/conf/hbase-site.xml.template                |  44 +
 src/conf/startup.properties                     |  25 +-
 src/main/assemblies/standalone-package.xml      |   7 +
 titan/pom.xml                                   | 130 +++
 .../titan/diskstorage/hbase/AdminMask.java      |  62 ++
 .../titan/diskstorage/hbase/ConnectionMask.java |  30 +
 .../titan/diskstorage/hbase/HBaseAdmin0_98.java | 152 +++
 .../titan/diskstorage/hbase/HBaseAdmin1_0.java  | 135 +++
 .../titan/diskstorage/hbase/HBaseCompat.java    |  60 ++
 .../diskstorage/hbase/HBaseCompat0_98.java      |  58 ++
 .../titan/diskstorage/hbase/HBaseCompat1_0.java |  58 ++
 .../titan/diskstorage/hbase/HBaseCompat1_1.java |  58 ++
 .../diskstorage/hbase/HBaseCompatLoader.java    |  80 ++
 .../hbase/HBaseKeyColumnValueStore.java         | 397 ++++++++
 .../diskstorage/hbase/HBaseStoreManager.java    | 926 +++++++++++++++++++
 .../diskstorage/hbase/HBaseTransaction.java     |  75 ++
 .../diskstorage/hbase/HConnection0_98.java      |  49 +
 .../titan/diskstorage/hbase/HConnection1_0.java |  50 +
 .../titan/diskstorage/hbase/HTable0_98.java     |  60 ++
 .../titan/diskstorage/hbase/HTable1_0.java      |  60 ++
 .../titan/diskstorage/hbase/TableMask.java      |  40 +
 .../diskstorage/locking/LocalLockMediator.java  | 345 +++++++
 .../locking/LocalLockMediatorTest.java          |  60 ++
 webapp/pom.xml                                  |   7 +
 31 files changed, 3375 insertions(+), 131 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/9edf9e52/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
index 377ee71..debb615 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -32,7 +32,20 @@
     <name>Apache Falcon Commons</name>
     <packaging>jar</packaging>
 
-
+    <profiles>
+        <profile>
+            <id>bdb-plugin-only</id>
+            <activation>
+               <activeByDefault>false</activeByDefault>
+               <property>
+                  <name>bdb-plugin-only</name>
+              </property>
+            </activation>
+            <properties>
+                 <tests.excluded>**/*Metadata*Test.java</tests.excluded>
+            </properties>
+        </profile>
+    </profiles>
     <dependencies>
         <dependency>
             <groupId>org.apache.hadoop</groupId>
@@ -72,7 +85,6 @@
             <groupId>commons-beanutils</groupId>
             <artifactId>commons-beanutils</artifactId>
         </dependency>
-
         <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
@@ -243,6 +255,15 @@
             </plugin>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <excludes>
+                         <exclude>${tests.excluded}</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-jar-plugin</artifactId>
                 <version>2.4</version>
                 <configuration>

http://git-wip-us.apache.org/repos/asf/falcon/blob/9edf9e52/distro/pom.xml
----------------------------------------------------------------------
diff --git a/distro/pom.xml b/distro/pom.xml
index e3e3a5f..4351400 100644
--- a/distro/pom.xml
+++ b/distro/pom.xml
@@ -1,84 +1,142 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
 
-       http://www.apache.org/licenses/LICENSE-2.0
+http://www.apache.org/licenses/LICENSE-2.0
 
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-    <modelVersion>4.0.0</modelVersion>
+  <modelVersion>4.0.0</modelVersion>
 
-    <parent>
-        <groupId>org.apache.falcon</groupId>
-        <artifactId>falcon-main</artifactId>
-        <version>0.10-SNAPSHOT</version>
-    </parent>
-    <artifactId>falcon-distro</artifactId>
-    <description>Apache Falcon Distro</description>
-    <name>Apache Falcon Distro</name>
-    <packaging>pom</packaging>
-      <profiles>
-         <profile>
-            <id>distributed</id>
-             <build>
-                <plugins>
-                    <plugin>
-                        <artifactId>maven-assembly-plugin</artifactId>
-                         <configuration>
-                            <descriptors>
-                                <descriptor>../src/main/assemblies/distributed-package.xml</descriptor>
-                                <descriptor>../src/main/assemblies/src-package.xml</descriptor>
-                            </descriptors>
-                            <finalName>apache-falcon-distributed-${project.version}</finalName>
-                        </configuration>
-                         <executions>
-                           <execution>
-                             <id>dist-assembly</id>
-                             <phase>package</phase>
-                             <goals>
-                               <goal>single</goal>
-                             </goals>
-                          </execution>
-                        </executions>
-                    </plugin>
-                </plugins>
-            </build>
-        </profile>
-     </profiles>
-          
-           <build>
-             <plugins>
-               <plugin>
-                 <artifactId>maven-assembly-plugin</artifactId>
+  <parent>
+    <groupId>org.apache.falcon</groupId>
+    <artifactId>falcon-main</artifactId>
+    <version>0.10-SNAPSHOT</version>
+  </parent>
+  <artifactId>falcon-distro</artifactId>
+  <description>Apache Falcon Distro</description>
+  <name>Apache Falcon Distro</name>
+
+  <properties>
+    <hbase.target.dir>${project.build.directory}/hbase</hbase.target.dir>
+    <hbase.archive.host>http://apache.mirrors.pair.com</hbase.archive.host>
+    <hbase.archive.version>${hbase.version}</hbase.archive.version>
+    <hbase.uri.path>hbase/stable</hbase.uri.path>
+    <hbase.archive>${hbase.archive.host}/${hbase.uri.path}/hbase-${hbase.archive.version}-bin.tar.gz</hbase.archive>
+    <hbase.root.folder>hbase-${hbase.archive.version}</hbase.root.folder>
+  </properties>
+
+  <packaging>pom</packaging>
+  <profiles>
+    <profile>
+      <id>distributed</id>
+      <build>
+        <plugins>
+          <plugin>
+            <artifactId>maven-assembly-plugin</artifactId>
+            <configuration>
+              <descriptors>
+                <descriptor>../src/main/assemblies/distributed-package.xml</descriptor>
+                <descriptor>../src/main/assemblies/src-package.xml</descriptor>
+              </descriptors>
+              <finalName>apache-falcon-distributed-${project.version}</finalName>
+            </configuration>
+            <executions>
+              <execution>
+                <id>dist-assembly</id>
+                <phase>package</phase>
+                <goals>
+                  <goal>single</goal>
+                </goals>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <profile>
+       <id>package-standalone-hbase</id>
+       <activation>
+           <activeByDefault>false</activeByDefault>
+           <property>
+              <name>package-standalone-hbase</name>
+           </property>
+       </activation>
+       <build>
+         <plugins>
+           <plugin>
+             <groupId>org.apache.maven.plugins</groupId>
+             <artifactId>maven-antrun-plugin</artifactId>
+             <version>1.7</version>
+             <executions>
+               <execution>
+                 <phase>generate-resources</phase>
+                 <goals>
+                   <goal>run</goal>
+                 </goals>
                  <configuration>
-                    <descriptors>
-                        <descriptor>../src/main/assemblies/standalone-package.xml</descriptor>
-                        <descriptor>../src/main/assemblies/src-package.xml</descriptor>
-                    </descriptors>
-                    <finalName>apache-falcon-${project.version}</finalName>
-                </configuration>
-                <executions>
-                   <execution>
-                     <id>dist-assembly</id>
-                     <phase>package</phase>
-                     <goals>
-                        <goal>single</goal>
-                     </goals>
-                  </execution>
-                </executions>
-             </plugin>
-           </plugins>
-         </build>
+                   <target name="Download HBase">
+                     <mkdir dir="${hbase.target.dir}"/>
+                     <get
+                         src="${hbase.archive}"
+                         dest="${project.build.directory}/hbase-${hbase.archive.version}-bin.tar.gz"
+                         usetimestamp="true"
+                         verbose="true" skipexisting="true"
+                         />
+                     <untar
+                         src="${project.build.directory}/hbase-${hbase.archive.version}-bin.tar.gz"
+                         dest="${project.build.directory}/hbase.temp"
+                         compression="gzip"
+                         />
+                     <copy todir="${hbase.target.dir}">
+                       <fileset dir="${project.build.directory}/hbase.temp/${hbase.root.folder}">
+                         <include name="**/*"/>
+                       </fileset>
+                     </copy>
+                     <delete dir="${project.build.directory}/hbase.temp"/>
+                   </target>
+                 </configuration>
+               </execution>
+             </executions>
+           </plugin>
+         </plugins>
+       </build>
+    </profile>
+  </profiles>
+
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <configuration>
+          <descriptors>
+            <descriptor>../src/main/assemblies/standalone-package.xml</descriptor>
+            <descriptor>../src/main/assemblies/src-package.xml</descriptor>
+          </descriptors>
+          <finalName>apache-falcon-${project.version}</finalName>
+        </configuration>
+        <executions>
+          <execution>
+            <id>dist-assembly</id>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
 </project>

http://git-wip-us.apache.org/repos/asf/falcon/blob/9edf9e52/docs/src/site/twiki/Configuration.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/Configuration.twiki b/docs/src/site/twiki/Configuration.twiki
index b90efac..bfca3d8 100644
--- a/docs/src/site/twiki/Configuration.twiki
+++ b/docs/src/site/twiki/Configuration.twiki
@@ -47,6 +47,9 @@ before any commands are executed. The following environment variables are availa
 
 # Where do you want to expand the war file. By Default it is in /server/webapp dir under the base install dir.
 #export FALCON_EXPANDED_WEBAPP_DIR=
+
+# Any additional classpath elements to be added to the Falcon server/client classpath
+#export FALCON_EXTRA_CLASS_PATH=
 </verbatim>
 
 ---++Advanced Configurations
@@ -321,6 +324,66 @@ schedule entities natively on Falcon, you will need to add some additional prope
 to <verbatim>$FALCON_HOME/conf/startup.properties</verbatim> before starting the Falcon Server.
 For details on the same, refer to [[FalconNativeScheduler][Falcon Native Scheduler]]
 
+---+++Titan GraphDB backend
+You can either choose to use 5.0.73 version of berkeleydb (the default for Falcon for the last few releases) or 1.1.x or later version HBase as the backend database. Falcon in its release distributions will have the titan storage plugins for both BerkeleyDB and HBase.
+
+----++++Using BerkeleyDB backend
+Falcon distributions may not package berkeley db artifacts (je-5.0.73.jar) based on build profiles.  If Berkeley DB is not packaged, you can download the Berkeley DB jar file from the URL: <verbatim>http://download.oracle.com/otn/berkeley-db/je-5.0.73.zip</verbatim>.   The following properties describe an example berkeley db graph storage backend that can be specified in the configuration file <verbatim>$FALCON_HOME/conf/startup.properties</verbatim>.
+
+<verbatim>
+# Graph Storage
+*.falcon.graph.storage.directory=${user.dir}/target/graphdb
+*.falcon.graph.storage.backend=berkeleyje
+*.falcon.graph.serialize.path=${user.dir}/target/graphdb
+</verbatim>
+
+----++++Using HBase backend
+
+To use HBase as the backend it is recommended that a HBase cluster be provisioned with distributed mode confiuguratoin, primarily because of the support of kerberos enabled clusters and HA considerations.  Based on build profile, a standalone hbase version can be packaged with the Falcon binary distribution.   Along with this, a template for <verbatim>hbase-site.xml</verbatim> is provided, which can be used to start the standalone mode HBase enviornment for development/testing purposes.
+
+Basic configuration
+
+<verbatim>
+*.falcon.graph.storage.backend=hbase
+#For standalone mode , specify localhost
+#for distributed mode, specify zookeeper quorum here - For more information refer http://s3.thinkaurelius.com/docs/titan/current/hbase.html#_remote_server_mode_2
+*.falcon.graph.storage.hostname=<ZooKeeper Quorum>
+</verbatim>
+
+HBase configuration file (hbase-site.xml) and hbase libraries need to be added to classpath when Falcon starts up.   The following must be appended to the environment variable <verbatim>FALCON_EXTRA_CLASS_PATH<verbatim> in <verbatim>$FALCON_HOME/bin/falcon-env.sh</verbatim>.   Additionally the correct hbase client libraries need to be added.  For example,
+<verbatim>
+export FALCON_EXTRA_CLASS_PATH=`${HBASE_HOME}/bin/hbase classpath`
+</verbatim>
+
+Table name
+We recommend that in the startup config the tablename for titan storage be named <verbatim>falcon_titan<verbatim> so that multiple applications using Titan can share the same HBase cluster.   This can be set by specifying the tablename using the startup property given below. The default value is shown.
+
+<verbatim>
+*.falcon.graph.storage.hbase.table=falcon_titan
+</verbatim>
+
+Permissions
+
+When Falcon is configured with HBase as the storage backend Titan needs to have sufficient authorizations to create and access an HBase table.  In a secure cluster it may be necessary to grant permissions to the <verbatim>falcon</verbatim> user for the <verbatim>falcon_titan</verbatim> table (or whateven tablename was specified for the property <verbatim>*.falcon.graph.storage.hbase.table</verbatim>
+
+With Ranger, a policy can be configured for <verbatim>falcon_titan</verbatim>.
+
+Without Ranger, HBase shell can be used to set the permissions.
+
+<verbatim>
+   su hbase
+   kinit -k -t <hbase keytab> <hbase principal>
+   echo "grant 'falcon', 'RWXCA', 'falcon_titan'" | hbase shell
+</verbatim>
+
+Advanced configuration
+
+HBase storage backend support in Titan has a few other configurations and they can be set in <verbatim>$FALCON_HOME/conf/startup.properties</verbatim>, by prefixing the Titan property with <verbatim>*.falcon.graph</verbatim> prefix.
+
+Please Refer to <verbatim>http://s3.thinkaurelius.com/docs/titan/0.5.4/titan-config-ref.html#_storage</verbatim> for generic storage properties, <verbaim>http://s3.thinkaurelius.com/docs/titan/0.5.4/titan-config-ref.html#_storage_berkeleydb</verbatim> for berkeley db properties and <verbatim>http://s3.thinkaurelius.com/docs/titan/0.5.4/titan-config-ref.html#_storage_hbase</verbatim> for hbase storage backend properties.
+
+
+
 ---+++Adding Extension Libraries
 
 Library extensions allows users to add custom libraries to entity lifecycles such as feed retention, feed replication

http://git-wip-us.apache.org/repos/asf/falcon/blob/9edf9e52/docs/src/site/twiki/InstallationSteps.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/InstallationSteps.twiki b/docs/src/site/twiki/InstallationSteps.twiki
index a5ee2cc..93b1eab 100644
--- a/docs/src/site/twiki/InstallationSteps.twiki
+++ b/docs/src/site/twiki/InstallationSteps.twiki
@@ -20,8 +20,10 @@ $git clone https://git-wip-us.apache.org/repos/asf/falcon.git falcon
 ---+++Step 2 - Build Falcon
 
 <verbatim>
-$cd falcon
-$export MAVEN_OPTS="-Xmx1024m -XX:MaxPermSize=256m -noverify" && mvn clean install
+$ cd falcon
+$ export MAVEN_OPTS="-Xmx1024m -XX:MaxPermSize=256m -noverify"
+$ mvn clean install 
+
 </verbatim>
 It builds and installs the package into the local repository, for use as a dependency in other projects locally.
 
@@ -75,6 +77,8 @@ $src/bin/package.sh <<hadoop-version>> <<oozie-version>>
 >> ex. src/bin/package.sh 2.5.0 4.0.0
 >> Falcon package is available in <<falcon home>>/target/apache-falcon-<<version>>-bin.tar.gz
 >> Oozie package is available in <<falcon home>>/target/oozie-4.0.1-distro.tar.gz
+>> __IMPORTANT:  You need to download the je-5.0.73 version from http://download.oracle.com/otn/berkeley-db/je-5.0.73.zip and extract je-5.0.73 under the Falcon webapp directory or provision an HBase cluster for use as Falcon graphdb backend DB.
+    Depending on the Graphdb backend choice, update the startup.properties appropriately.__
 </verbatim>
 
 *NOTE:* If you have a separate Apache Oozie installation, you will need to follow some additional steps:

http://git-wip-us.apache.org/repos/asf/falcon/blob/9edf9e52/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 52af03e..0d804b2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -99,6 +99,9 @@
         <oozie.buildversion>${oozie.version}-falcon</oozie.buildversion>
         <oozie.forcebuild>false</oozie.forcebuild>
         <activemq.version>5.12.0</activemq.version>
+        <tinkerpop.version>2.6.0</tinkerpop.version>
+        <titan.version>0.5.4</titan.version>
+        <hbase.version>1.1.5</hbase.version>
         <hive.version>0.13.1</hive.version>
         <spark.version>1.6.1</spark.version>
         <jetty.version>6.1.26</jetty.version>
@@ -118,6 +121,59 @@
 
     <profiles>
         <profile>
+            <id>bdb-plugin-only</id>
+            <activation>
+                <activeByDefault>false</activeByDefault>
+                <property>
+                    <name>bdb-plugin-only</name>
+                </property>
+            </activation>
+            <dependencyManagement>
+                <dependencies>
+                   <dependency>
+                        <groupId>com.thinkaurelius.titan</groupId>
+                        <artifactId>titan-berkeleyje</artifactId>
+                        <version>0.5.4</version>
+                        <exclusions>
+                            <exclusion>
+                               <groupId>com.sleepycat</groupId>
+                                <artifactId>je</artifactId>
+                            </exclusion>
+                        </exclusions>
+                    </dependency>
+                    <!--
+                     - optional=true gets je-5.0.73.jar into war classpath
+                     - without making it part of WEB-INF/lib
+                     - Version hardcoded to 5.0.73 with sleepycat licence
+                     -->
+                    <dependency>
+                        <groupId>com.sleepycat</groupId>
+                        <artifactId>je</artifactId>
+                        <version>5.0.73</version>
+                        <optional>true</optional>
+                    </dependency>
+                </dependencies>
+            </dependencyManagement>
+        </profile>
+        <profile>
+            <id>bdb-backend</id>
+            <activation>
+                <activeByDefault>false</activeByDefault>
+               <property>
+                    <name>!bdb-plugin-only</name>
+                </property>
+            </activation>
+            <dependencyManagement>
+               <dependencies>
+                    <dependency>
+                        <groupId>com.thinkaurelius.titan</groupId>
+                        <artifactId>titan-berkeleyje</artifactId>
+                        <version>0.5.4</version>
+                    </dependency>
+                </dependencies>
+           </dependencyManagement>
+        </profile>
+        <profile>
             <id>test-patch</id>
             <build>
                 <plugins>
@@ -245,6 +301,8 @@
                                             <property>hive.version</property>
                                             <regex>^(1.2.*)</regex>
                                             <regexMessage>HiveDR only supports hive version >= 1.2.0</regexMessage>
+                                        </requireProperty>
+                                        <requireProperty>
                                             <property>oozie.version</property>
                                             <regex>^(4.2.*)</regex>
                                             <regexMessage>HiveDR only supports oozie version >= 4.2.0</regexMessage>
@@ -281,7 +339,9 @@
                                         <requireProperty>
                                             <property>hadoop.version</property>
                                             <regex>^(2.7.*)</regex>
-                                            <regexMessage>HDFS Snapshot replication only works with hadoop version >= 2.7.0</regexMessage>
+                                            <regexMessage>HDFS Snapshot replication only works with hadoop version >=
+                                                2.7.0
+                                            </regexMessage>
                                         </requireProperty>
                                     </rules>
                                     <fail>true</fail>
@@ -310,6 +370,7 @@
         <module>client</module>
         <module>cli</module>
         <module>metrics</module>
+        <module>titan</module>
         <module>common</module>
         <module>test-util</module>
         <module>hadoop-dependencies</module>
@@ -640,6 +701,60 @@
 
             <dependency>
                 <groupId>org.apache.falcon</groupId>
+                <artifactId>falcon-titan</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+
+            <!-- Graph DB -->
+
+            <dependency>
+                <groupId>com.thinkaurelius.titan</groupId>
+                <artifactId>titan-core</artifactId>
+                <version>${titan.version}</version>
+                <exclusions>
+                    <!-- rexster does not work with servlet-api -->
+                    <exclusion>
+                        <groupId>com.tinkerpop.rexster</groupId>
+                        <artifactId>rexster-core</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>com.tinkerpop.rexster</groupId>
+                        <artifactId>rexster-server</artifactId>
+                    </exclusion>
+                    <!-- asm 4.0 does not work with jersey asm 3.1 -->
+                    <exclusion>
+                        <groupId>com.tinkerpop</groupId>
+                        <artifactId>frames</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>com.esotericsoftware.reflectasm</groupId>
+                        <artifactId>reflectasm</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.ow2.asm</groupId>
+                        <artifactId>asm</artifactId>
+                    </exclusion>
+                    <exclusion> <!-- GPL license imported from ganglia -->
+                        <groupId>org.acplt</groupId>
+                        <artifactId>oncrpc</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+
+            <dependency>
+                <groupId>com.thinkaurelius.titan</groupId>
+                <artifactId>titan-es</artifactId>
+                <version>${titan.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>com.vividsolutions</groupId>
+                <artifactId>jts</artifactId>
+                <version>1.13</version>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.falcon</groupId>
                 <artifactId>falcon-hadoop-dependencies</artifactId>
                 <version>${project.version}</version>
             </dependency>
@@ -1100,43 +1215,7 @@
             <dependency>
                 <groupId>com.tinkerpop.blueprints</groupId>
                 <artifactId>blueprints-core</artifactId>
-                <version>2.5.0</version>
-            </dependency>
-
-            <dependency>
-                <groupId>com.thinkaurelius.titan</groupId>
-                <artifactId>titan-core</artifactId>
-                <version>0.5.4</version>
-                <exclusions>
-                    <!-- rexster does not work with servlet-api -->
-                    <exclusion>
-                        <groupId>com.tinkerpop.rexster</groupId>
-                        <artifactId>rexster-core</artifactId>
-                    </exclusion>
-                    <exclusion>
-                        <groupId>com.tinkerpop.rexster</groupId>
-                        <artifactId>rexster-server</artifactId>
-                    </exclusion>
-                    <!-- asm 4.0 does not work with jersey asm 3.1 -->
-                    <exclusion>
-                        <groupId>com.tinkerpop</groupId>
-                        <artifactId>frames</artifactId>
-                    </exclusion>
-                    <exclusion>
-                        <groupId>com.esotericsoftware.reflectasm</groupId>
-                        <artifactId>reflectasm</artifactId>
-                    </exclusion>
-                    <exclusion>
-                        <groupId>org.ow2.asm</groupId>
-                        <artifactId>asm</artifactId>
-                    </exclusion>
-                </exclusions>
-            </dependency>
-
-            <dependency>
-                <groupId>com.thinkaurelius.titan</groupId>
-                <artifactId>titan-berkeleyje</artifactId>
-                <version>0.5.4</version>
+                <version>${tinkerpop.version}</version>
             </dependency>
 
             <dependency>
@@ -1175,6 +1254,39 @@
                 <version>3.0.2</version>
             </dependency>
 
+            <dependency>
+                <groupId>com.thinkaurelius.titan</groupId>
+                <artifactId>titan-hbase</artifactId>
+                <version>${titan.version}</version>
+           </dependency>
+            <dependency>
+                <groupId>org.apache.hbase</groupId>
+                <artifactId>hbase-client</artifactId>
+                <version>${hbase.version}</version>
+                <scope>provided</scope>
+                <exclusions>
+                    <exclusion>
+                        <artifactId>avro</artifactId>
+                        <groupId>org.apache.avro</groupId>
+                    </exclusion>
+                    <exclusion>
+                        <artifactId>jruby-complete</artifactId>
+                        <groupId>org.jruby</groupId>
+                    </exclusion>
+                    <exclusion>
+                        <artifactId>asm</artifactId>
+                        <groupId>asm</groupId>
+                    </exclusion>
+                    <exclusion>
+                        <artifactId>*</artifactId>
+                        <groupId>org.apache.hadoop</groupId>
+                    </exclusion>
+                    <exclusion>
+                        <artifactId>*</artifactId>
+                        <groupId>org.mortbay.jetty</groupId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 
@@ -1341,7 +1453,6 @@
                     </execution>
                 </executions>
             </plugin>
-
             <plugin>
                 <groupId>org.codehaus.mojo</groupId>
                 <artifactId>buildnumber-maven-plugin</artifactId>
@@ -1394,21 +1505,21 @@
                             <goal>javadoc</goal>
                             <goal>jar</goal>
                         </goals>
-                     </execution>
+                    </execution>
                 </executions>
             </plugin>
 
-              <plugin>
+            <plugin>
                 <artifactId>maven-assembly-plugin</artifactId>
                 <version>2.6</version>
-                  <configuration>
-                      <descriptors>
-                          <descriptor>src/main/assemblies/assembly-src.xml</descriptor>
-                      </descriptors>
-                      <finalName>falcon-${project.version}</finalName>
-                  </configuration>
+                <configuration>
+                    <descriptors>
+                        <descriptor>src/main/assemblies/assembly-src.xml</descriptor>
+                    </descriptors>
+                    <finalName>falcon-${project.version}</finalName>
+                </configuration>
             </plugin>
- 
+
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-surefire-plugin</artifactId>

http://git-wip-us.apache.org/repos/asf/falcon/blob/9edf9e52/prism/pom.xml
----------------------------------------------------------------------
diff --git a/prism/pom.xml b/prism/pom.xml
index 1a6c2cc..2eddbc1 100644
--- a/prism/pom.xml
+++ b/prism/pom.xml
@@ -32,6 +32,22 @@
     <name>Apache Falcon Prism</name>
     <packaging>war</packaging>
 
+    <profiles>
+        <profile>
+            <id>bdb-plugin-only</id>
+            <activation>
+               <activeByDefault>false</activeByDefault>
+              <property>
+                  <name>bdb-plugin-only</name>
+               </property>
+            </activation>
+            <properties>
+                 <tests.excluded.1>**/*Metadata*Test.java</tests.excluded.1>
+                 <tests.excluded.2>**/*InstanceManager*Test.java</tests.excluded.2>
+            </properties>
+        </profile>
+    </profiles>
+
     <dependencies>
         <dependency>
             <groupId>org.apache.hadoop</groupId>
@@ -172,6 +188,7 @@
             <plugin>
                 <groupId>org.codehaus.mojo</groupId>
                 <artifactId>keytool-maven-plugin</artifactId>
+                <version>1.5</version>
                 <executions>
                     <execution>
                         <phase>generate-resources</phase>
@@ -273,6 +290,16 @@
                     </execution>
                 </executions>
             </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <excludes>
+                         <exclude>${tests.excluded.1}</exclude>
+                         <exclude>${tests.excluded.2}</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
         </plugins>
     </build>
 </project>

http://git-wip-us.apache.org/repos/asf/falcon/blob/9edf9e52/src/bin/package.sh
----------------------------------------------------------------------
diff --git a/src/bin/package.sh b/src/bin/package.sh
index 328d6f1..a2cc1c2 100755
--- a/src/bin/package.sh
+++ b/src/bin/package.sh
@@ -63,7 +63,7 @@ tar -zcvf ${FALCON_SRC}/target/oozie-$OOZIE_VERSION-distro.tar.gz oozie-*
 
 popd
 popd
-mvn assembly:assembly -P $HADOOP_PROFILE -Dhadoop.version=$HADOOP_VERSION -Doozie.version=$OOZIE_VERSION -Doozie.forcebuild=true -DskipTests -DskipCheck=true
+mvn assembly:assembly -P $HADOOP_PROFILE -Dhadoop.version=$HADOOP_VERSION -Doozie.version=$OOZIE_VERSION -Doozie.forcebuild=true -Dbdb-plugin-only -DskipTests -DskipCheck=true
 
 echo "Falcon pacakge is available in ${FALCON_SRC}/target/falcon-<<version>>-bin.tar.gz"
 echo "Oozie pacakge is available in ${FALCON_SRC}/target/oozie-$OOZIE_VERSION/distro/target/oozie-$OOZIE_VERSION-distro.tar.gz"

http://git-wip-us.apache.org/repos/asf/falcon/blob/9edf9e52/src/conf/hbase-site.xml.template
----------------------------------------------------------------------
diff --git a/src/conf/hbase-site.xml.template b/src/conf/hbase-site.xml.template
new file mode 100644
index 0000000..2c72617
--- /dev/null
+++ b/src/conf/hbase-site.xml.template
@@ -0,0 +1,44 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<configuration>
+  <property>
+    <name>hbase.rootdir</name>
+    <value>file://${hbase_home}/root</value>
+  </property>
+  <property>
+    <name>hbase.zookeeper.property.dataDir</name>
+    <value>${hbase_home}/zookeeper-data</value>
+  </property>
+  <property>
+    <name>hbase.master.info.port</name>
+    <value>62510</value>
+  </property>
+  <property>
+    <name>hbase.regionserver.info.port</name>
+    <value>62530</value>
+  </property>
+  <property>
+    <name>hbase.master.port</name>
+    <value>62500</value>
+  </property>
+  <property>
+    <name>hbase.regionserver.port</name>
+    <value>62520</value>
+  </property>
+</configuration>

http://git-wip-us.apache.org/repos/asf/falcon/blob/9edf9e52/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index 1a2d411..d732013 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -173,9 +173,26 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
 *.falcon.graph.blueprints.graph=com.thinkaurelius.titan.core.TitanFactory
 
 # Graph Storage
-*.falcon.graph.storage.directory=/${falcon.home}/data/graphdb
-*.falcon.graph.storage.backend=berkeleyje
-*.falcon.graph.serialize.path=${user.dir}/target/graphdb
+# IMPORTANT:   Please enable one of hbase or berkeleydb backends are enabled
+#  after the backend requirements are provisioned as needed.
+
+# Enable the following for Berkeley DB.  Make sure je-5.0.73.jar is
+# downloaded and available under Falcon webapp directory or under falcon
+# server classpath.
+
+#*.falcon.graph.storage.directory=/${falcon.home}/data/graphdb
+#*.falcon.graph.storage.backend=berkeleyje
+
+
+# Enable the following for HBase
+#*.falcon.graph.storage.backend=hbase
+#For standalone mode , set hostname to localhost
+#for distributed mode, set to the zookeeper quorum
+# @see http://s3.thinkaurelius.com/docs/titan/current/hbase.html#_remote_server_mode_2
+
+#*.falcon.graph.storage.hostname=localhost
+#*.falcon.graph.serialize.path=${user.dir}/target/graphdb
+#*.falcon.graph.storage.hbase.table=falcon_titan
 
 # Avoid acquiring read lock when iterating over large graphs
 # See http://s3.thinkaurelius.com/docs/titan/0.5.4/bdb.html
@@ -318,4 +335,4 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
 #*.falcon.graphite.hostname=localhost
 #*.falcon.graphite.port=2003
 #*.falcon.graphite.frequency=1
-#*.falcon.graphite.prefix=falcon
\ No newline at end of file
+#*.falcon.graphite.prefix=falcon

http://git-wip-us.apache.org/repos/asf/falcon/blob/9edf9e52/src/main/assemblies/standalone-package.xml
----------------------------------------------------------------------
diff --git a/src/main/assemblies/standalone-package.xml b/src/main/assemblies/standalone-package.xml
index e97eceb..54d89ce 100644
--- a/src/main/assemblies/standalone-package.xml
+++ b/src/main/assemblies/standalone-package.xml
@@ -77,6 +77,13 @@
         </fileSet>
 
         <fileSet>
+          <directory>target/hbase</directory>
+	  <outputDirectory>hbase</outputDirectory>
+          <fileMode>0755</fileMode>
+          <directoryMode>0755</directoryMode>
+        </fileSet>
+
+        <fileSet>
             <directory>../logs</directory>
             <outputDirectory>logs</outputDirectory>
             <directoryMode>0777</directoryMode>

http://git-wip-us.apache.org/repos/asf/falcon/blob/9edf9e52/titan/pom.xml
----------------------------------------------------------------------
diff --git a/titan/pom.xml b/titan/pom.xml
new file mode 100644
index 0000000..1409cfa
--- /dev/null
+++ b/titan/pom.xml
@@ -0,0 +1,130 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>falcon-main</artifactId>
+        <groupId>org.apache.falcon</groupId>
+        <version>0.10-SNAPSHOT</version>
+    </parent>
+    <artifactId>falcon-titan</artifactId>
+    <description>Titan HBase 1.0 shaded libraries for 0.5.4 for Falcon</description>
+    <name>Apache Falcon Titan 1.0 support</name>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.thinkaurelius.titan</groupId>
+            <artifactId>titan-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-client</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.vividsolutions</groupId>
+            <artifactId>jts</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.thinkaurelius.titan</groupId>
+            <artifactId>titan-es</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.thinkaurelius.titan</groupId>
+            <artifactId>titan-berkeleyje</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.testng</groupId>
+            <artifactId>testng</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+        </dependency>
+
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version>2.4</version>
+                <configuration>
+                    <excludes>
+                        <exclude>**/log4j.xml</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+                <version>1.2.1</version>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>checkstyle-check</id>
+                        <goals>
+                            <goal>check</goal>
+                        </goals>
+                        <phase>verify</phase>
+                        <configuration>
+                            <skip>true</skip>
+                        </configuration>
+                    </execution>
+                </executions>
+             </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>findbugs-maven-plugin</artifactId>
+                <configuration>
+                    <skip>true</skip>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>findbugs-check</id>
+                        <goals>
+                            <goal>check</goal>
+                        </goals>
+                        <phase>verify</phase>
+                        <configuration>
+                            <skip>true</skip>
+                        </configuration>
+                    </execution>
+                </executions>
+             </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/falcon/blob/9edf9e52/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/AdminMask.java
----------------------------------------------------------------------
diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/AdminMask.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/AdminMask.java
new file mode 100644
index 0000000..e255f1b
--- /dev/null
+++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/AdminMask.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.ClusterStatus;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+
+/**
+ * This interface hides ABI/API breaking changes that HBase has made to its Admin/HBaseAdmin over the course
+ * of development from 0.94 to 1.0 and beyond.
+ */
+public interface AdminMask extends Closeable
+{
+
+    void clearTable(String tableName, long timestamp) throws IOException;
+
+    HTableDescriptor getTableDescriptor(String tableName) throws TableNotFoundException, IOException;
+
+    boolean tableExists(String tableName) throws IOException;
+
+    void createTable(HTableDescriptor desc) throws IOException;
+
+    void createTable(HTableDescriptor desc, byte[] startKey, byte[] endKey, int numRegions) throws IOException;
+
+    /**
+     * Estimate the number of regionservers in the HBase cluster.
+     *
+     * This is usually implemented by calling
+     * {@link HBaseAdmin#getClusterStatus()} and then
+     * {@link ClusterStatus#getServers()} and finally {@code size()} on the
+     * returned server list.
+     *
+     * @return the number of servers in the cluster or -1 if it could not be determined
+     */
+    int getEstimatedRegionServerCount();
+
+    void disableTable(String tableName) throws IOException;
+
+    void enableTable(String tableName) throws IOException;
+
+    boolean isTableDisabled(String tableName) throws IOException;
+
+    void addColumn(String tableName, HColumnDescriptor columnDescriptor) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/9edf9e52/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/ConnectionMask.java
----------------------------------------------------------------------
diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/ConnectionMask.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/ConnectionMask.java
new file mode 100644
index 0000000..feb578b
--- /dev/null
+++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/ConnectionMask.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * This interface hides ABI/API breaking changes that HBase has made to its (H)Connection class over the course
+ * of development from 0.94 to 1.0 and beyond.
+ */
+public interface ConnectionMask extends Closeable
+{
+
+    TableMask getTable(String name) throws IOException;
+
+    AdminMask getAdmin() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/9edf9e52/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin0_98.java
----------------------------------------------------------------------
diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin0_98.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin0_98.java
new file mode 100644
index 0000000..0cd4795
--- /dev/null
+++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin0_98.java
@@ -0,0 +1,152 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.thinkaurelius.titan.util.system.IOUtils;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+
+public class HBaseAdmin0_98 implements AdminMask
+{
+
+    private static final Logger log = LoggerFactory.getLogger(HBaseAdmin0_98.class);
+
+    private final HBaseAdmin adm;
+
+    public HBaseAdmin0_98(HBaseAdmin adm)
+    {
+        this.adm = adm;
+    }
+
+    @Override
+    public void clearTable(String tableName, long timestamp) throws IOException
+    {
+        if (!adm.tableExists(tableName)) {
+            log.debug("clearStorage() called before table {} was created, skipping.", tableName);
+            return;
+        }
+
+        // Unfortunately, linear scanning and deleting tables is faster in HBase < 1 when running integration tests than
+        // disabling and deleting tables.
+        HTable table = null;
+
+        try {
+            table = new HTable(adm.getConfiguration(), tableName);
+
+            Scan scan = new Scan();
+            scan.setBatch(100);
+            scan.setCacheBlocks(false);
+            scan.setCaching(2000);
+            scan.setTimeRange(0, Long.MAX_VALUE);
+            scan.setMaxVersions(1);
+
+            ResultScanner scanner = null;
+
+            try {
+                scanner = table.getScanner(scan);
+
+                for (Result res : scanner) {
+                    Delete d = new Delete(res.getRow());
+
+                    d.setTimestamp(timestamp);
+                    table.delete(d);
+                }
+            } finally {
+                IOUtils.closeQuietly(scanner);
+            }
+        } finally {
+            IOUtils.closeQuietly(table);
+        }
+    }
+
+    @Override
+    public HTableDescriptor getTableDescriptor(String tableName) throws TableNotFoundException, IOException
+    {
+        return adm.getTableDescriptor(tableName.getBytes());
+    }
+
+    @Override
+    public boolean tableExists(String tableName) throws IOException
+    {
+        return adm.tableExists(tableName);
+    }
+
+    @Override
+    public void createTable(HTableDescriptor desc) throws IOException
+    {
+        adm.createTable(desc);
+    }
+
+    @Override
+    public void createTable(HTableDescriptor desc, byte[] startKey, byte[] endKey, int numRegions) throws IOException
+    {
+        adm.createTable(desc, startKey, endKey, numRegions);
+    }
+
+    @Override
+    public int getEstimatedRegionServerCount()
+    {
+        int serverCount = -1;
+        try {
+            serverCount = adm.getClusterStatus().getServers().size();
+            log.debug("Read {} servers from HBase ClusterStatus", serverCount);
+        } catch (IOException e) {
+            log.debug("Unable to retrieve HBase cluster status", e);
+        }
+        return serverCount;
+    }
+
+    @Override
+    public void disableTable(String tableName) throws IOException
+    {
+        adm.disableTable(tableName);
+    }
+
+    @Override
+    public void enableTable(String tableName) throws IOException
+    {
+        adm.enableTable(tableName);
+    }
+
+    @Override
+    public boolean isTableDisabled(String tableName) throws IOException
+    {
+        return adm.isTableDisabled(tableName);
+    }
+
+    @Override
+    public void addColumn(String tableName, HColumnDescriptor columnDescriptor) throws IOException
+    {
+        adm.addColumn(tableName, columnDescriptor);
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+        adm.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/9edf9e52/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin1_0.java
----------------------------------------------------------------------
diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin1_0.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin1_0.java
new file mode 100644
index 0000000..7e8f72d
--- /dev/null
+++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin1_0.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotDisabledException;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+
+public class HBaseAdmin1_0 implements AdminMask
+{
+
+    private static final Logger log = LoggerFactory.getLogger(HBaseAdmin1_0.class);
+
+    private final Admin adm;
+
+    public HBaseAdmin1_0(HBaseAdmin adm)
+    {
+        this.adm = adm;
+    }
+    @Override
+    public void clearTable(String tableString, long timestamp) throws IOException
+    {
+        TableName tableName = TableName.valueOf(tableString);
+
+        if (!adm.tableExists(tableName)) {
+            log.debug("Attempted to clear table {} before it exists (noop)", tableString);
+            return;
+        }
+
+        if (!adm.isTableDisabled(tableName))
+            adm.disableTable(tableName);
+
+        if (!adm.isTableDisabled(tableName))
+            throw new RuntimeException("Unable to disable table " + tableName);
+
+        // This API call appears to both truncate and reenable the table.
+        log.info("Truncating table {}", tableName);
+        adm.truncateTable(tableName, true /* preserve splits */);
+
+        try {
+            adm.enableTable(tableName);
+        } catch (TableNotDisabledException e) {
+            // This triggers seemingly every time in testing with 1.0.2.
+            log.debug("Table automatically reenabled by truncation: {}", tableName, e);
+        }
+    }
+
+    @Override
+    public HTableDescriptor getTableDescriptor(String tableString) throws TableNotFoundException, IOException
+    {
+        return adm.getTableDescriptor(TableName.valueOf(tableString));
+    }
+
+    @Override
+    public boolean tableExists(String tableString) throws IOException
+    {
+        return adm.tableExists(TableName.valueOf(tableString));
+    }
+
+    @Override
+    public void createTable(HTableDescriptor desc) throws IOException
+    {
+        adm.createTable(desc);
+    }
+
+    @Override
+    public void createTable(HTableDescriptor desc, byte[] startKey, byte[] endKey, int numRegions) throws IOException
+    {
+        adm.createTable(desc, startKey, endKey, numRegions);
+    }
+
+    @Override
+    public int getEstimatedRegionServerCount()
+    {
+        int serverCount = -1;
+        try {
+            serverCount = adm.getClusterStatus().getServers().size();
+            log.debug("Read {} servers from HBase ClusterStatus", serverCount);
+        } catch (IOException e) {
+            log.debug("Unable to retrieve HBase cluster status", e);
+        }
+        return serverCount;
+    }
+
+    @Override
+    public void disableTable(String tableString) throws IOException
+    {
+        adm.disableTable(TableName.valueOf(tableString));
+    }
+
+    @Override
+    public void enableTable(String tableString) throws IOException
+    {
+        adm.enableTable(TableName.valueOf(tableString));
+    }
+
+    @Override
+    public boolean isTableDisabled(String tableString) throws IOException
+    {
+        return adm.isTableDisabled(TableName.valueOf(tableString));
+    }
+
+    @Override
+    public void addColumn(String tableString, HColumnDescriptor columnDescriptor) throws IOException
+    {
+        adm.addColumn(TableName.valueOf(tableString), columnDescriptor);
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+        adm.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/9edf9e52/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat.java
----------------------------------------------------------------------
diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat.java
new file mode 100644
index 0000000..c9b03aa
--- /dev/null
+++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.Delete;
+
+public interface HBaseCompat {
+
+    /**
+     * Configure the compression scheme {@code algo} on a column family
+     * descriptor {@code cd}. The {@code algo} parameter is a string value
+     * corresponding to one of the values of HBase's Compression enum. The
+     * Compression enum has moved between packages as HBase has evolved, which
+     * is why this method has a String argument in the signature instead of the
+     * enum itself.
+     *
+     * @param cd
+     *            column family to configure
+     * @param algo
+     *            compression type to use
+     */
+    public void setCompression(HColumnDescriptor cd, String algo);
+
+    /**
+     * Create and return a HTableDescriptor instance with the given name. The
+     * constructors on this method have remained stable over HBase development
+     * so far, but the old HTableDescriptor(String) constructor & byte[] friends
+     * are now marked deprecated and may eventually be removed in favor of the
+     * HTableDescriptor(TableName) constructor. That constructor (and the
+     * TableName type) only exists in newer HBase versions. Hence this method.
+     *
+     * @param tableName
+     *            HBase table name
+     * @return a new table descriptor instance
+     */
+    public HTableDescriptor newTableDescriptor(String tableName);
+
+    ConnectionMask createConnection(Configuration conf) throws IOException;
+
+    void addColumnFamilyToTableDescriptor(HTableDescriptor tdesc, HColumnDescriptor cdesc);
+
+    void setTimestamp(Delete d, long timestamp);
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/9edf9e52/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat0_98.java
----------------------------------------------------------------------
diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat0_98.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat0_98.java
new file mode 100644
index 0000000..2c0f3b4
--- /dev/null
+++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat0_98.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.io.compress.Compression;
+
+public class HBaseCompat0_98 implements HBaseCompat {
+
+    @Override
+    public void setCompression(HColumnDescriptor cd, String algo) {
+        cd.setCompressionType(Compression.Algorithm.valueOf(algo));
+    }
+
+    @Override
+    public HTableDescriptor newTableDescriptor(String tableName) {
+        TableName tn = TableName.valueOf(tableName);
+        return new HTableDescriptor(tn);
+    }
+
+    @Override
+    public ConnectionMask createConnection(Configuration conf) throws IOException
+    {
+        return new HConnection0_98(HConnectionManager.createConnection(conf));
+    }
+
+    @Override
+    public void addColumnFamilyToTableDescriptor(HTableDescriptor tdesc, HColumnDescriptor cdesc)
+    {
+        tdesc.addFamily(cdesc);
+    }
+
+    @Override
+    public void setTimestamp(Delete d, long timestamp)
+    {
+        d.setTimestamp(timestamp);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/9edf9e52/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_0.java
----------------------------------------------------------------------
diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_0.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_0.java
new file mode 100644
index 0000000..bb3fb3b
--- /dev/null
+++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_0.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.io.compress.Compression;
+
+public class HBaseCompat1_0 implements HBaseCompat {
+
+    @Override
+    public void setCompression(HColumnDescriptor cd, String algo) {
+        cd.setCompressionType(Compression.Algorithm.valueOf(algo));
+    }
+
+    @Override
+    public HTableDescriptor newTableDescriptor(String tableName) {
+        TableName tn = TableName.valueOf(tableName);
+        return new HTableDescriptor(tn);
+    }
+
+    @Override
+    public ConnectionMask createConnection(Configuration conf) throws IOException
+    {
+        return new HConnection1_0(ConnectionFactory.createConnection(conf));
+    }
+
+    @Override
+    public void addColumnFamilyToTableDescriptor(HTableDescriptor tdesc, HColumnDescriptor cdesc)
+    {
+        tdesc.addFamily(cdesc);
+    }
+
+    @Override
+    public void setTimestamp(Delete d, long timestamp)
+    {
+        d.setTimestamp(timestamp);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/9edf9e52/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_1.java
----------------------------------------------------------------------
diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_1.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_1.java
new file mode 100644
index 0000000..e5c3d31
--- /dev/null
+++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_1.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.io.compress.Compression;
+
+import java.io.IOException;
+
+public class HBaseCompat1_1 implements HBaseCompat {
+
+    @Override
+    public void setCompression(HColumnDescriptor cd, String algo) {
+        cd.setCompressionType(Compression.Algorithm.valueOf(algo));
+    }
+
+    @Override
+    public HTableDescriptor newTableDescriptor(String tableName) {
+        TableName tn = TableName.valueOf(tableName);
+        return new HTableDescriptor(tn);
+    }
+
+    @Override
+    public ConnectionMask createConnection(Configuration conf) throws IOException
+    {
+        return new HConnection1_0(ConnectionFactory.createConnection(conf));
+    }
+
+    @Override
+    public void addColumnFamilyToTableDescriptor(HTableDescriptor tdesc, HColumnDescriptor cdesc)
+    {
+        tdesc.addFamily(cdesc);
+    }
+
+    @Override
+    public void setTimestamp(Delete d, long timestamp)
+    {
+        d.setTimestamp(timestamp);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/9edf9e52/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompatLoader.java
----------------------------------------------------------------------
diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompatLoader.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompatLoader.java
new file mode 100644
index 0000000..2c0d6fe
--- /dev/null
+++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompatLoader.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.hbase.util.VersionInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HBaseCompatLoader {
+
+    private static final Logger log = LoggerFactory.getLogger(HBaseCompatLoader.class);
+
+    private static final String DEFAULT_HBASE_COMPAT_VERSION = "1.1";
+
+    private static final String DEFAULT_HBASE_CLASS_NAME = "com.thinkaurelius.titan.diskstorage.hbase.HBaseCompat1_1";
+
+    private static HBaseCompat cachedCompat;
+
+    public synchronized static HBaseCompat getCompat(String classOverride) {
+
+        if (null != cachedCompat) {
+            log.debug("Returning cached HBase compatibility layer: {}", cachedCompat);
+            return cachedCompat;
+        }
+
+        HBaseCompat compat;
+        String className = null;
+        String classNameSource = null;
+
+        if (null != classOverride) {
+            className = classOverride;
+            classNameSource = "from explicit configuration";
+        } else {
+            String hbaseVersion = VersionInfo.getVersion();
+            for (String supportedVersion : Arrays.asList("0.94", "0.96", "0.98", "1.0", "1.1")) {
+                if (hbaseVersion.startsWith(supportedVersion + ".")) {
+                    className = "com.thinkaurelius.titan.diskstorage.hbase.HBaseCompat" + supportedVersion.replaceAll("\\.", "_");
+                    classNameSource = "supporting runtime HBase version " + hbaseVersion;
+                    break;
+                }
+            }
+            if (null == className) {
+                log.info("The HBase version {} is not explicitly supported by Titan.  " +
+                         "Loading Titan's compatibility layer for its most recent supported HBase version ({})",
+                        hbaseVersion, DEFAULT_HBASE_COMPAT_VERSION);
+                className = DEFAULT_HBASE_CLASS_NAME;
+                classNameSource = " by default";
+            }
+        }
+
+        final String errTemplate = " when instantiating HBase compatibility class " + className;
+
+        try {
+            compat = (HBaseCompat)Class.forName(className).newInstance();
+            log.info("Instantiated HBase compatibility layer {}: {}", classNameSource, compat.getClass().getCanonicalName());
+        } catch (IllegalAccessException e) {
+            throw new RuntimeException(e.getClass().getSimpleName() + errTemplate, e);
+        } catch (InstantiationException e) {
+            throw new RuntimeException(e.getClass().getSimpleName() + errTemplate, e);
+        } catch (ClassNotFoundException e) {
+            throw new RuntimeException(e.getClass().getSimpleName() + errTemplate, e);
+        }
+
+        return cachedCompat = compat;
+    }
+}


[2/3] falcon git commit: FALCON-1858 Support HBase as a storage backend for Falcon Titan graphDB

Posted by pe...@apache.org.
http://git-wip-us.apache.org/repos/asf/falcon/blob/9edf9e52/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java
----------------------------------------------------------------------
diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java
new file mode 100644
index 0000000..b4dc12e
--- /dev/null
+++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java
@@ -0,0 +1,397 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.thinkaurelius.titan.core.attribute.Duration;
+import com.thinkaurelius.titan.diskstorage.*;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.*;
+import com.thinkaurelius.titan.diskstorage.locking.LocalLockMediator;
+import com.thinkaurelius.titan.diskstorage.locking.PermanentLockingException;
+import com.thinkaurelius.titan.diskstorage.util.KeyColumn;
+import com.thinkaurelius.titan.diskstorage.util.RecordIterator;
+import com.thinkaurelius.titan.diskstorage.util.StaticArrayBuffer;
+import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntry;
+import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntryList;
+import com.thinkaurelius.titan.diskstorage.util.time.Timepoint;
+import com.thinkaurelius.titan.diskstorage.util.time.Timestamps;
+import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration;
+import com.thinkaurelius.titan.util.system.IOUtils;
+
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.filter.ColumnPaginationFilter;
+import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.*;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static com.thinkaurelius.titan.diskstorage.EntryMetaData.*;
+
+/**
+ * Here are some areas that might need work:
+ * <p/>
+ * - batching? (consider HTable#batch, HTable#setAutoFlush(false)
+ * - tuning HTable#setWriteBufferSize (?)
+ * - writing a server-side filter to replace ColumnCountGetFilter, which drops
+ * all columns on the row where it reaches its limit.  This requires getSlice,
+ * currently, to impose its limit on the client side.  That obviously won't
+ * scale.
+ * - RowMutations for combining Puts+Deletes (need a newer HBase than 0.92 for this)
+ * - (maybe) fiddle with HTable#setRegionCachePrefetch and/or #prewarmRegionCache
+ * <p/>
+ * There may be other problem areas.  These are just the ones of which I'm aware.
+ */
+public class HBaseKeyColumnValueStore implements KeyColumnValueStore {
+
+    private static final Logger logger = LoggerFactory.getLogger(HBaseKeyColumnValueStore.class);
+
+    private final String tableName;
+    private final HBaseStoreManager storeManager;
+
+    // When using shortened CF names, columnFamily is the shortname and storeName is the longname
+    // When not using shortened CF names, they are the same
+    //private final String columnFamily;
+    private final String storeName;
+    // This is columnFamily.getBytes()
+    private final byte[] columnFamilyBytes;
+    private final HBaseGetter entryGetter;
+
+    private final ConnectionMask cnx;
+
+    private LocalLockMediator<StoreTransaction> localLockMediator;
+
+    private Duration lockExpiryTime;
+
+    HBaseKeyColumnValueStore(HBaseStoreManager storeManager, ConnectionMask cnx, String tableName, String columnFamily, String storeName, LocalLockMediator<StoreTransaction> llm) {
+        this.storeManager = storeManager;
+        this.cnx = cnx;
+        this.tableName = tableName;
+        //this.columnFamily = columnFamily;
+        this.storeName = storeName;
+        this.columnFamilyBytes = columnFamily.getBytes();
+        this.entryGetter = new HBaseGetter(storeManager.getMetaDataSchema(storeName));
+        this.localLockMediator = llm;
+        this.lockExpiryTime = storeManager.getStorageConfig().get(GraphDatabaseConfiguration.LOCK_EXPIRE);
+    }
+
+    @Override
+    public void close() throws BackendException {
+    }
+
+    @Override
+    public EntryList getSlice(KeySliceQuery query, StoreTransaction txh) throws BackendException {
+        Map<StaticBuffer, EntryList> result = getHelper(Arrays.asList(query.getKey()), getFilter(query));
+        return Iterables.getOnlyElement(result.values(), EntryList.EMPTY_LIST);
+    }
+
+    @Override
+    public Map<StaticBuffer,EntryList> getSlice(List<StaticBuffer> keys, SliceQuery query, StoreTransaction txh) throws BackendException {
+        return getHelper(keys, getFilter(query));
+    }
+
+    @Override
+    public void mutate(StaticBuffer key, List<Entry> additions, List<StaticBuffer> deletions, StoreTransaction txh) throws BackendException {
+        Map<StaticBuffer, KCVMutation> mutations = ImmutableMap.of(key, new KCVMutation(additions, deletions));
+        mutateMany(mutations, txh);
+    }
+
+    @Override
+    public void acquireLock(StaticBuffer key,
+                            StaticBuffer column,
+                            StaticBuffer expectedValue,
+                            StoreTransaction txh) throws BackendException {
+
+        KeyColumn lockID = new KeyColumn(key, column);
+        logger.debug("Attempting to acquireLock on {} ", lockID);
+        final Timepoint lockStartTime = Timestamps.NANO.getTime(System.nanoTime(), TimeUnit.NANOSECONDS);
+        boolean locked = localLockMediator.lock(lockID, txh, lockStartTime.add(lockExpiryTime));
+        if (!locked) {
+            throw new PermanentLockingException("Could not lock the keyColumn " + lockID +  " on CF {} " + Bytes.toString(columnFamilyBytes));
+        }
+        ((HBaseTransaction) txh).updateLocks(lockID, expectedValue);
+    }
+
+    @Override
+    public KeyIterator getKeys(KeyRangeQuery query, StoreTransaction txh) throws BackendException {
+        return executeKeySliceQuery(query.getKeyStart().as(StaticBuffer.ARRAY_FACTORY),
+                query.getKeyEnd().as(StaticBuffer.ARRAY_FACTORY),
+                new FilterList(FilterList.Operator.MUST_PASS_ALL),
+                query);
+    }
+
+    @Override
+    public String getName() {
+        return storeName;
+    }
+
+    @Override
+    public KeyIterator getKeys(SliceQuery query, StoreTransaction txh) throws BackendException {
+        return executeKeySliceQuery(new FilterList(FilterList.Operator.MUST_PASS_ALL), query);
+    }
+
+    public static Filter getFilter(SliceQuery query) {
+        byte[] colStartBytes = query.getSliceEnd().length() > 0 ? query.getSliceStart().as(StaticBuffer.ARRAY_FACTORY) : null;
+        byte[] colEndBytes = query.getSliceEnd().length() > 0 ? query.getSliceEnd().as(StaticBuffer.ARRAY_FACTORY) : null;
+
+        Filter filter = new ColumnRangeFilter(colStartBytes, true, colEndBytes, false);
+
+        if (query.hasLimit()) {
+            filter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
+                    filter,
+                    new ColumnPaginationFilter(query.getLimit(), 0));
+        }
+
+        logger.debug("Generated HBase Filter {}", filter);
+
+        return filter;
+    }
+
+    private Map<StaticBuffer,EntryList> getHelper(List<StaticBuffer> keys, Filter getFilter) throws BackendException {
+        List<Get> requests = new ArrayList<Get>(keys.size());
+        {
+            for (StaticBuffer key : keys) {
+                Get g = new Get(key.as(StaticBuffer.ARRAY_FACTORY)).addFamily(columnFamilyBytes).setFilter(getFilter);
+                try {
+                    g.setTimeRange(0, Long.MAX_VALUE);
+                } catch (IOException e) {
+                    throw new PermanentBackendException(e);
+                }
+                requests.add(g);
+            }
+        }
+
+        Map<StaticBuffer,EntryList> resultMap = new HashMap<StaticBuffer,EntryList>(keys.size());
+
+        try {
+            TableMask table = null;
+            Result[] results = null;
+
+            try {
+                table = cnx.getTable(tableName);
+                logger.debug("Get requests {} {} ", Bytes.toString(columnFamilyBytes), requests.size());
+                results = table.get(requests);
+                logger.debug("Get requests finished {} {} ", Bytes.toString(columnFamilyBytes), requests.size());
+            } finally {
+                IOUtils.closeQuietly(table);
+            }
+
+            if (results == null)
+                return KCVSUtil.emptyResults(keys);
+
+            assert results.length==keys.size();
+
+            for (int i = 0; i < results.length; i++) {
+                Result result = results[i];
+                NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> f = result.getMap();
+
+                if (f == null) { // no result for this key
+                    resultMap.put(keys.get(i), EntryList.EMPTY_LIST);
+                    continue;
+                }
+
+                // actual key with <timestamp, value>
+                NavigableMap<byte[], NavigableMap<Long, byte[]>> r = f.get(columnFamilyBytes);
+                resultMap.put(keys.get(i), (r == null)
+                                            ? EntryList.EMPTY_LIST
+                                            : StaticArrayEntryList.ofBytes(r.entrySet(), entryGetter));
+            }
+
+            return resultMap;
+        } catch (IOException e) {
+            throw new TemporaryBackendException(e);
+        }
+    }
+
+    private void mutateMany(Map<StaticBuffer, KCVMutation> mutations, StoreTransaction txh) throws BackendException {
+        storeManager.mutateMany(ImmutableMap.of(storeName, mutations), txh);
+    }
+
+    private KeyIterator executeKeySliceQuery(FilterList filters, @Nullable SliceQuery columnSlice) throws BackendException {
+        return executeKeySliceQuery(null, null, filters, columnSlice);
+    }
+
+    private KeyIterator executeKeySliceQuery(@Nullable byte[] startKey,
+                                            @Nullable byte[] endKey,
+                                            FilterList filters,
+                                            @Nullable SliceQuery columnSlice) throws BackendException {
+        Scan scan = new Scan().addFamily(columnFamilyBytes);
+
+        try {
+            scan.setTimeRange(0, Long.MAX_VALUE);
+        } catch (IOException e) {
+            throw new PermanentBackendException(e);
+        }
+
+        if (startKey != null)
+            scan.setStartRow(startKey);
+
+        if (endKey != null)
+            scan.setStopRow(endKey);
+
+        if (columnSlice != null) {
+            filters.addFilter(getFilter(columnSlice));
+        }
+
+        TableMask table = null;
+
+        logger.debug("Scan for row keys {} {} ", Bytes.toString(startKey), Bytes.toString(endKey));
+        try {
+            table = cnx.getTable(tableName);
+            return new RowIterator(table, table.getScanner(scan.setFilter(filters)), columnFamilyBytes);
+        } catch (IOException e) {
+            IOUtils.closeQuietly(table);
+            throw new PermanentBackendException(e);
+        }
+    }
+
+    private class RowIterator implements KeyIterator {
+        private final Closeable table;
+        private final Iterator<Result> rows;
+        private final byte[] columnFamilyBytes;
+
+        private Result currentRow;
+        private boolean isClosed;
+
+        public RowIterator(Closeable table, ResultScanner rows, byte[] columnFamilyBytes) {
+            this.table = table;
+            this.columnFamilyBytes = Arrays.copyOf(columnFamilyBytes, columnFamilyBytes.length);
+            this.rows = Iterators.filter(rows.iterator(), new Predicate<Result>() {
+                @Override
+                public boolean apply(@Nullable Result result) {
+                    if (result == null)
+                        return false;
+
+                    try {
+                        StaticBuffer id = StaticArrayBuffer.of(result.getRow());
+                        id.getLong(0);
+                    } catch (NumberFormatException e) {
+                        return false;
+                    }
+
+                    return true;
+                }
+            });
+        }
+
+        @Override
+        public RecordIterator<Entry> getEntries() {
+            ensureOpen();
+
+            return new RecordIterator<Entry>() {
+                private final Iterator<Map.Entry<byte[], NavigableMap<Long, byte[]>>> kv = currentRow.getMap().get(columnFamilyBytes).entrySet().iterator();
+
+                @Override
+                public boolean hasNext() {
+                    ensureOpen();
+                    return kv.hasNext();
+                }
+
+                @Override
+                public Entry next() {
+                    ensureOpen();
+                    return StaticArrayEntry.ofBytes(kv.next(), entryGetter);
+                }
+
+                @Override
+                public void close() {
+                    isClosed = true;
+                }
+
+                @Override
+                public void remove() {
+                    throw new UnsupportedOperationException();
+                }
+            };
+        }
+
+        @Override
+        public boolean hasNext() {
+            ensureOpen();
+            return rows.hasNext();
+        }
+
+        @Override
+        public StaticBuffer next() {
+            ensureOpen();
+
+            currentRow = rows.next();
+            return StaticArrayBuffer.of(currentRow.getRow());
+        }
+
+        @Override
+        public void close() {
+            IOUtils.closeQuietly(table);
+            isClosed = true;
+            logger.debug("RowIterator closed table {}", table);
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+
+        private void ensureOpen() {
+            if (isClosed)
+                throw new IllegalStateException("Iterator has been closed.");
+        }
+    }
+
+    private static class HBaseGetter implements StaticArrayEntry.GetColVal<Map.Entry<byte[], NavigableMap<Long, byte[]>>, byte[]> {
+
+        private final EntryMetaData[] schema;
+
+        private HBaseGetter(EntryMetaData[] schema) {
+            this.schema = schema;
+        }
+
+        @Override
+        public byte[] getColumn(Map.Entry<byte[], NavigableMap<Long, byte[]>> element) {
+            return element.getKey();
+        }
+
+        @Override
+        public byte[] getValue(Map.Entry<byte[], NavigableMap<Long, byte[]>> element) {
+            return element.getValue().lastEntry().getValue();
+        }
+
+        @Override
+        public EntryMetaData[] getMetaSchema(Map.Entry<byte[], NavigableMap<Long, byte[]>> element) {
+            return schema;
+        }
+
+        @Override
+        public Object getMetaData(Map.Entry<byte[], NavigableMap<Long, byte[]>> element, EntryMetaData meta) {
+            switch(meta) {
+                case TIMESTAMP:
+                    return element.getValue().lastEntry().getKey();
+                default:
+                    throw new UnsupportedOperationException("Unsupported meta data: " + meta);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/9edf9e52/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseStoreManager.java
----------------------------------------------------------------------
diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseStoreManager.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseStoreManager.java
new file mode 100644
index 0000000..52f28af
--- /dev/null
+++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseStoreManager.java
@@ -0,0 +1,926 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import com.thinkaurelius.titan.diskstorage.Backend;
+import com.thinkaurelius.titan.diskstorage.configuration.ConfigElement;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.CustomizeStoreKCVSManager;
+import com.thinkaurelius.titan.diskstorage.locking.LocalLockMediator;
+import com.thinkaurelius.titan.diskstorage.locking.LocalLockMediators;
+import com.thinkaurelius.titan.diskstorage.util.time.Timestamps;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableNotEnabledException;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.VersionInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import com.thinkaurelius.titan.core.TitanException;
+import com.thinkaurelius.titan.diskstorage.BackendException;
+import com.thinkaurelius.titan.diskstorage.BaseTransactionConfig;
+import com.thinkaurelius.titan.diskstorage.Entry;
+import com.thinkaurelius.titan.diskstorage.PermanentBackendException;
+import com.thinkaurelius.titan.diskstorage.StaticBuffer;
+import com.thinkaurelius.titan.diskstorage.TemporaryBackendException;
+import com.thinkaurelius.titan.diskstorage.common.DistributedStoreManager;
+import com.thinkaurelius.titan.diskstorage.configuration.ConfigNamespace;
+import com.thinkaurelius.titan.diskstorage.configuration.ConfigOption;
+import com.thinkaurelius.titan.diskstorage.configuration.Configuration;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KCVMutation;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStore;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyRange;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StandardStoreFeatures;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreFeatures;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreTransaction;
+import com.thinkaurelius.titan.diskstorage.util.BufferUtil;
+import com.thinkaurelius.titan.diskstorage.util.StaticArrayBuffer;
+import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration;
+import com.thinkaurelius.titan.graphdb.configuration.PreInitializeConfigOptions;
+import com.thinkaurelius.titan.util.system.IOUtils;
+import com.thinkaurelius.titan.util.system.NetworkUtil;
+
+/**
+ * Storage Manager for HBase
+ *
+ * @author Dan LaRocque <da...@hopcount.org>
+ */
+@PreInitializeConfigOptions
+public class HBaseStoreManager extends DistributedStoreManager implements KeyColumnValueStoreManager, CustomizeStoreKCVSManager {
+
+    private static final Logger logger = LoggerFactory.getLogger(HBaseStoreManager.class);
+
+    public static final ConfigNamespace HBASE_NS =
+            new ConfigNamespace(GraphDatabaseConfiguration.STORAGE_NS, "hbase", "HBase storage options");
+
+    public static final ConfigOption<Boolean> SHORT_CF_NAMES =
+            new ConfigOption<Boolean>(HBASE_NS, "short-cf-names",
+            "Whether to shorten the names of Titan's column families to one-character mnemonics " +
+            "to conserve storage space", ConfigOption.Type.FIXED, true);
+
+    public static final String COMPRESSION_DEFAULT = "-DEFAULT-";
+
+    public static final ConfigOption<String> COMPRESSION =
+            new ConfigOption<String>(HBASE_NS, "compression-algorithm",
+            "An HBase Compression.Algorithm enum string which will be applied to newly created column families. " +
+            "The compression algorithm must be installed and available on the HBase cluster.  Titan cannot install " +
+            "and configure new compression algorithms on the HBase cluster by itself.",
+            ConfigOption.Type.MASKABLE, "GZ");
+
+    public static final ConfigOption<Boolean> SKIP_SCHEMA_CHECK =
+            new ConfigOption<Boolean>(HBASE_NS, "skip-schema-check",
+            "Assume that Titan's HBase table and column families already exist. " +
+            "When this is true, Titan will not check for the existence of its table/CFs, " +
+            "nor will it attempt to create them under any circumstances.  This is useful " +
+            "when running Titan without HBase admin privileges.",
+            ConfigOption.Type.MASKABLE, false);
+
+    public static final ConfigOption<String> HBASE_TABLE =
+            new ConfigOption<String>(HBASE_NS, "table",
+            "The name of the table Titan will use.  When " + ConfigElement.getPath(SKIP_SCHEMA_CHECK) +
+            " is false, Titan will automatically create this table if it does not already exist.",
+            ConfigOption.Type.LOCAL, "titan");
+
+    /**
+     * Related bug fixed in 0.98.0, 0.94.7, 0.95.0:
+     *
+     * https://issues.apache.org/jira/browse/HBASE-8170
+     */
+    public static final int MIN_REGION_COUNT = 3;
+
+    /**
+     * The total number of HBase regions to create with Titan's table. This
+     * setting only effects table creation; this normally happens just once when
+     * Titan connects to an HBase backend for the first time.
+     */
+    public static final ConfigOption<Integer> REGION_COUNT =
+            new ConfigOption<Integer>(HBASE_NS, "region-count",
+            "The number of initial regions set when creating Titan's HBase table",
+            ConfigOption.Type.MASKABLE, Integer.class, new Predicate<Integer>() {
+                @Override
+                public boolean apply(Integer input) {
+                    return null != input && MIN_REGION_COUNT <= input;
+                }
+            }
+    );
+
+    /**
+     * This setting is used only when {@link #REGION_COUNT} is unset.
+     * <p/>
+     * If Titan's HBase table does not exist, then it will be created with total
+     * region count = (number of servers reported by ClusterStatus) * (this
+     * value).
+     * <p/>
+     * The Apache HBase manual suggests an order-of-magnitude range of potential
+     * values for this setting:
+     *
+     * <ul>
+     *  <li>
+     *   <a href="https://hbase.apache.org/book/important_configurations.html#disable.splitting">2.5.2.7. Managed Splitting</a>:
+     *   <blockquote>
+     *    What's the optimal number of pre-split regions to create? Mileage will
+     *    vary depending upon your application. You could start low with 10
+     *    pre-split regions / server and watch as data grows over time. It's
+     *    better to err on the side of too little regions and rolling split later.
+     *   </blockquote>
+     *  </li>
+     *  <li>
+     *   <a href="https://hbase.apache.org/book/regions.arch.html">9.7 Regions</a>:
+     *   <blockquote>
+     *    In general, HBase is designed to run with a small (20-200) number of
+     *    relatively large (5-20Gb) regions per server... Typically you want to
+     *    keep your region count low on HBase for numerous reasons. Usually
+     *    right around 100 regions per RegionServer has yielded the best results.
+     *   </blockquote>
+     *  </li>
+     * </ul>
+     *
+     * These considerations may differ for other HBase implementations (e.g. MapR).
+     */
+    public static final ConfigOption<Integer> REGIONS_PER_SERVER =
+            new ConfigOption<Integer>(HBASE_NS, "regions-per-server",
+            "The number of regions per regionserver to set when creating Titan's HBase table",
+            ConfigOption.Type.MASKABLE, Integer.class);
+
+    /**
+     * If this key is present in either the JVM system properties or the process
+     * environment (checked in the listed order, first hit wins), then its value
+     * must be the full package and class name of an implementation of
+     * {@link HBaseCompat} that has a no-arg public constructor.
+     * <p>
+     * When this <b>is not</b> set, Titan attempts to automatically detect the
+     * HBase runtime version by calling {@link VersionInfo#getVersion()}. Titan
+     * then checks the returned version string against a hard-coded list of
+     * supported version prefixes and instantiates the associated compat layer
+     * if a match is found.
+     * <p>
+     * When this <b>is</b> set, Titan will not call
+     * {@code VersionInfo.getVersion()} or read its hard-coded list of supported
+     * version prefixes. Titan will instead attempt to instantiate the class
+     * specified (via the no-arg constructor which must exist) and then attempt
+     * to cast it to HBaseCompat and use it as such. Titan will assume the
+     * supplied implementation is compatible with the runtime HBase version and
+     * make no attempt to verify that assumption.
+     * <p>
+     * Setting this key incorrectly could cause runtime exceptions at best or
+     * silent data corruption at worst. This setting is intended for users
+     * running exotic HBase implementations that don't support VersionInfo or
+     * implementations which return values from {@code VersionInfo.getVersion()}
+     * that are inconsistent with Apache's versioning convention. It may also be
+     * useful to users who want to run against a new release of HBase that Titan
+     * doesn't yet officially support.
+     *
+     */
+    public static final ConfigOption<String> COMPAT_CLASS =
+            new ConfigOption<String>(HBASE_NS, "compat-class",
+            "The package and class name of the HBaseCompat implementation. HBaseCompat masks version-specific HBase API differences. " +
+            "When this option is unset, Titan calls HBase's VersionInfo.getVersion() and loads the matching compat class " +
+            "at runtime.  Setting this option forces Titan to instead reflectively load and instantiate the specified class.",
+            ConfigOption.Type.MASKABLE, String.class);
+
+    public static final int PORT_DEFAULT = 9160;
+
+    public static final Timestamps PREFERRED_TIMESTAMPS = Timestamps.MILLI;
+
+    public static final ConfigNamespace HBASE_CONFIGURATION_NAMESPACE =
+            new ConfigNamespace(HBASE_NS, "ext", "Overrides for hbase-{site,default}.xml options", true);
+
+    private static final BiMap<String, String> SHORT_CF_NAME_MAP =
+            ImmutableBiMap.<String, String>builder()
+                    .put(Backend.INDEXSTORE_NAME, "g")
+                    .put(Backend.INDEXSTORE_NAME + Backend.LOCK_STORE_SUFFIX, "h")
+                    .put(Backend.ID_STORE_NAME, "i")
+                    .put(Backend.EDGESTORE_NAME, "e")
+                    .put(Backend.EDGESTORE_NAME + Backend.LOCK_STORE_SUFFIX, "f")
+                    .put(GraphDatabaseConfiguration.SYSTEM_PROPERTIES_STORE_NAME, "s")
+                    .put(GraphDatabaseConfiguration.SYSTEM_PROPERTIES_STORE_NAME + Backend.LOCK_STORE_SUFFIX, "t")
+                    .put(Backend.SYSTEM_MGMT_LOG_NAME, "m")
+                    .put(Backend.SYSTEM_TX_LOG_NAME, "l")
+                    .build();
+
+    private static final StaticBuffer FOUR_ZERO_BYTES = BufferUtil.zeroBuffer(4);
+
+    static {
+        // Verify that shortCfNameMap is injective
+        // Should be guaranteed by Guava BiMap, but it doesn't hurt to check
+        Preconditions.checkArgument(null != SHORT_CF_NAME_MAP);
+        Collection<String> shorts = SHORT_CF_NAME_MAP.values();
+        Preconditions.checkArgument(Sets.newHashSet(shorts).size() == shorts.size());
+    }
+
+    // Immutable instance fields
+    private final String tableName;
+    private final String compression;
+    private final int regionCount;
+    private final int regionsPerServer;
+    private final ConnectionMask cnx;
+    private final org.apache.hadoop.conf.Configuration hconf;
+    private final boolean shortCfNames;
+    private final boolean skipSchemaCheck;
+    private final String compatClass;
+    private final HBaseCompat compat;
+
+    private static final ConcurrentHashMap<HBaseStoreManager, Throwable> openManagers =
+            new ConcurrentHashMap<HBaseStoreManager, Throwable>();
+
+    // Mutable instance state
+    private final ConcurrentMap<String, HBaseKeyColumnValueStore> openStores;
+
+    private LocalLockMediator<StoreTransaction> llm;
+
+    public HBaseStoreManager(com.thinkaurelius.titan.diskstorage.configuration.Configuration config) throws BackendException {
+        super(config, PORT_DEFAULT);
+
+        checkConfigDeprecation(config);
+
+        this.tableName = config.get(HBASE_TABLE);
+        this.compression = config.get(COMPRESSION);
+        this.regionCount = config.has(REGION_COUNT) ? config.get(REGION_COUNT) : -1;
+        this.regionsPerServer = config.has(REGIONS_PER_SERVER) ? config.get(REGIONS_PER_SERVER) : -1;
+        this.skipSchemaCheck = config.get(SKIP_SCHEMA_CHECK);
+        this.compatClass = config.has(COMPAT_CLASS) ? config.get(COMPAT_CLASS) : null;
+        this.compat = HBaseCompatLoader.getCompat(compatClass);
+
+        /*
+         * Specifying both region count options is permitted but may be
+         * indicative of a misunderstanding, so issue a warning.
+         */
+        if (config.has(REGIONS_PER_SERVER) && config.has(REGION_COUNT)) {
+            logger.warn("Both {} and {} are set in Titan's configuration, but "
+                      + "the former takes precedence and the latter will be ignored.",
+                        REGION_COUNT, REGIONS_PER_SERVER);
+        }
+
+        /* This static factory calls HBaseConfiguration.addHbaseResources(),
+         * which in turn applies the contents of hbase-default.xml and then
+         * applies the contents of hbase-site.xml.
+         */
+        this.hconf = HBaseConfiguration.create();
+
+        // Copy a subset of our commons config into a Hadoop config
+        int keysLoaded=0;
+        Map<String,Object> configSub = config.getSubset(HBASE_CONFIGURATION_NAMESPACE);
+        for (Map.Entry<String,Object> entry : configSub.entrySet()) {
+            logger.info("HBase configuration: setting {}={}", entry.getKey(), entry.getValue());
+            if (entry.getValue()==null) continue;
+            hconf.set(entry.getKey(), entry.getValue().toString());
+            keysLoaded++;
+        }
+
+        // Special case for STORAGE_HOSTS
+        if (config.has(GraphDatabaseConfiguration.STORAGE_HOSTS)) {
+            String zkQuorumKey = "hbase.zookeeper.quorum";
+            String csHostList = Joiner.on(",").join(config.get(GraphDatabaseConfiguration.STORAGE_HOSTS));
+            hconf.set(zkQuorumKey, csHostList);
+            logger.info("Copied host list from {} to {}: {}", GraphDatabaseConfiguration.STORAGE_HOSTS, zkQuorumKey, csHostList);
+        }
+
+        logger.debug("HBase configuration: set a total of {} configuration values", keysLoaded);
+
+        this.shortCfNames = config.get(SHORT_CF_NAMES);
+
+        try {
+            //this.cnx = HConnectionManager.createConnection(hconf);
+            this.cnx = compat.createConnection(hconf);
+        } catch (IOException e) {
+            throw new PermanentBackendException(e);
+        }
+
+        if (logger.isTraceEnabled()) {
+            openManagers.put(this, new Throwable("Manager Opened"));
+            dumpOpenManagers();
+        }
+
+        logger.debug("Dumping HBase config key=value pairs");
+        for (Map.Entry<String, String> entry : hconf) {
+            logger.debug("[HBaseConfig] " + entry.getKey() + "=" + entry.getValue());
+        }
+        logger.debug("End of HBase config key=value pairs");
+
+        openStores = new ConcurrentHashMap<String, HBaseKeyColumnValueStore>();
+    }
+
+    @Override
+    public Deployment getDeployment() {
+        List<KeyRange> local;
+        try {
+            local = getLocalKeyPartition();
+            return null != local && !local.isEmpty() ? Deployment.LOCAL : Deployment.REMOTE;
+        } catch (BackendException e) {
+            // propagating StorageException might be a better approach
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "hbase[" + tableName + "@" + super.toString() + "]";
+    }
+
+    public void dumpOpenManagers() {
+        int estimatedSize = openManagers.size();
+        logger.trace("---- Begin open HBase store manager list ({} managers) ----", estimatedSize);
+        for (HBaseStoreManager m : openManagers.keySet()) {
+            logger.trace("Manager {} opened at:", m, openManagers.get(m));
+        }
+        logger.trace("----   End open HBase store manager list ({} managers)  ----", estimatedSize);
+    }
+
+    @Override
+    public void close() {
+        openStores.clear();
+        if (logger.isTraceEnabled())
+            openManagers.remove(this);
+        IOUtils.closeQuietly(cnx);
+    }
+
+    @Override
+    public StoreFeatures getFeatures() {
+
+        Configuration c = GraphDatabaseConfiguration.buildConfiguration();
+
+        StandardStoreFeatures.Builder fb = new StandardStoreFeatures.Builder()
+                .orderedScan(true).unorderedScan(true).batchMutation(true)
+                .multiQuery(true).distributed(true).keyOrdered(true).storeTTL(true)
+                .timestamps(true).preferredTimestamps(PREFERRED_TIMESTAMPS)
+                .locking(true)
+                .keyConsistent(c);
+
+        try {
+            fb.localKeyPartition(getDeployment() == Deployment.LOCAL);
+        } catch (Exception e) {
+            logger.warn("Unexpected exception during getDeployment()", e);
+        }
+
+        return fb.build();
+    }
+
+    @Override
+    public void mutateMany(Map<String, Map<StaticBuffer, KCVMutation>> mutations, StoreTransaction txh) throws BackendException {
+        logger.debug("Enter mutateMany");
+        final MaskedTimestamp commitTime = new MaskedTimestamp(txh);
+        // In case of an addition and deletion with identical timestamps, the
+        // deletion tombstone wins.
+        // http://hbase.apache.org/book/versions.html#d244e4250
+        Map<StaticBuffer, Pair<Put, Delete>> commandsPerKey =
+                convertToCommands(
+                        mutations,
+                        commitTime.getAdditionTime(times.getUnit()),
+                        commitTime.getDeletionTime(times.getUnit()));
+
+        List<Row> batch = new ArrayList<Row>(commandsPerKey.size()); // actual batch operation
+
+        // convert sorted commands into representation required for 'batch' operation
+        for (Pair<Put, Delete> commands : commandsPerKey.values()) {
+            if (commands.getFirst() != null)
+                batch.add(commands.getFirst());
+
+            if (commands.getSecond() != null)
+                batch.add(commands.getSecond());
+        }
+
+        try {
+            TableMask table = null;
+
+            try {
+                table = cnx.getTable(tableName);
+                logger.debug("mutateMany : batch mutate started size {} ", batch.size());
+                table.batch(batch, new Object[batch.size()]);
+                logger.debug("mutateMany : batch mutate finished {} ", batch.size());
+            } finally {
+                IOUtils.closeQuietly(table);
+            }
+        } catch (IOException e) {
+            throw new TemporaryBackendException(e);
+        } catch (InterruptedException e) {
+            throw new TemporaryBackendException(e);
+        }
+
+        sleepAfterWrite(txh, commitTime);
+    }
+
+    @Override
+    public KeyColumnValueStore openDatabase(String longName) throws BackendException {
+
+        return openDatabase(longName, -1);
+    }
+
+    @Override
+    public KeyColumnValueStore openDatabase(final String longName, int ttlInSeconds) throws BackendException {
+
+        HBaseKeyColumnValueStore store = openStores.get(longName);
+
+        if (store == null) {
+            final String cfName = shortCfNames ? shortenCfName(longName) : longName;
+
+            final String llmPrefix = getName();
+            llm = LocalLockMediators.INSTANCE.<StoreTransaction>get(llmPrefix, times);
+            HBaseKeyColumnValueStore newStore = new HBaseKeyColumnValueStore(this, cnx, tableName, cfName, longName, llm);
+
+            store = openStores.putIfAbsent(longName, newStore); // nothing bad happens if we loose to other thread
+
+            if (store == null) {
+                if (!skipSchemaCheck)
+                    ensureColumnFamilyExists(tableName, cfName, ttlInSeconds);
+
+                store = newStore;
+            }
+            logger.info("Loaded 1.x Hbase Client Store Manager");
+        }
+
+        return store;
+    }
+
+
+    @Override
+    public StoreTransaction beginTransaction(final BaseTransactionConfig config) throws BackendException {
+        return new HBaseTransaction(config, llm);
+    }
+
+    @Override
+    public String getName() {
+        return tableName;
+    }
+
+    /**
+     * Deletes the specified table with all its columns.
+     * ATTENTION: Invoking this method will delete the table if it exists and therefore causes data loss.
+     */
+    @Override
+    public void clearStorage() throws BackendException {
+        try (AdminMask adm = getAdminInterface()) {
+            adm.clearTable(tableName, times.getTime().getNativeTimestamp());
+        } catch (IOException e)
+        {
+            throw new TemporaryBackendException(e);
+        }
+    }
+
+    @Override
+    public List<KeyRange> getLocalKeyPartition() throws BackendException {
+
+        List<KeyRange> result = new LinkedList<KeyRange>();
+
+        HTable table = null;
+        try {
+            ensureTableExists(tableName, getCfNameForStoreName(GraphDatabaseConfiguration.SYSTEM_PROPERTIES_STORE_NAME), 0);
+
+            table = new HTable(hconf, tableName);
+
+            Map<KeyRange, ServerName> normed =
+                    normalizeKeyBounds(table.getRegionLocations());
+
+            for (Map.Entry<KeyRange, ServerName> e : normed.entrySet()) {
+                if (NetworkUtil.isLocalConnection(e.getValue().getHostname())) {
+                    result.add(e.getKey());
+                    logger.debug("Found local key/row partition {} on host {}", e.getKey(), e.getValue());
+                } else {
+                    logger.debug("Discarding remote {}", e.getValue());
+                }
+            }
+        } catch (MasterNotRunningException e) {
+            logger.warn("Unexpected MasterNotRunningException", e);
+        } catch (ZooKeeperConnectionException e) {
+            logger.warn("Unexpected ZooKeeperConnectionException", e);
+        } catch (IOException e) {
+            logger.warn("Unexpected IOException", e);
+        } finally {
+            IOUtils.closeQuietly(table);
+        }
+        return result;
+    }
+
+    /**
+     * Given a map produced by {@link HTable#getRegionLocations()}, transform
+     * each key from an {@link HRegionInfo} to a {@link KeyRange} expressing the
+     * region's start and end key bounds using Titan-partitioning-friendly
+     * conventions (start inclusive, end exclusive, zero bytes appended where
+     * necessary to make all keys at least 4 bytes long).
+     * <p/>
+     * This method iterates over the entries in its map parameter and performs
+     * the following conditional conversions on its keys. "Require" below means
+     * either a {@link Preconditions} invocation or an assertion. HRegionInfo
+     * sometimes returns start and end keys of zero length; this method replaces
+     * zero length keys with null before doing any of the checks described
+     * below. The parameter map and the values it contains are only read and
+     * never modified.
+     *
+     * <ul>
+     * <li>If an entry's HRegionInfo has null start and end keys, then first
+     * require that the parameter map is a singleton, and then return a
+     * single-entry map whose {@code KeyRange} has start and end buffers that
+     * are both four bytes of zeros.</li>
+     * <li>If the entry has a null end key (but non-null start key), put an
+     * equivalent entry in the result map with a start key identical to the
+     * input, except that zeros are appended to values less than 4 bytes long,
+     * and an end key that is four bytes of zeros.
+     * <li>If the entry has a null start key (but non-null end key), put an
+     * equivalent entry in the result map where the start key is four bytes of
+     * zeros, and the end key has zeros appended, if necessary, to make it at
+     * least 4 bytes long, after which one is added to the padded value in
+     * unsigned 32-bit arithmetic with overflow allowed.</li>
+     * <li>Any entry which matches none of the above criteria results in an
+     * equivalent entry in the returned map, except that zeros are appended to
+     * both keys to make each at least 4 bytes long, and the end key is then
+     * incremented as described in the last bullet point.</li>
+     * </ul>
+     *
+     * After iterating over the parameter map, this method checks that it either
+     * saw no entries with null keys, one entry with a null start key and a
+     * different entry with a null end key, or one entry with both start and end
+     * keys null. If any null keys are observed besides these three cases, the
+     * method will die with a precondition failure.
+     *
+     * @param raw
+     *            A map of HRegionInfo and ServerName from HBase
+     * @return Titan-friendly expression of each region's rowkey boundaries
+     */
+    private Map<KeyRange, ServerName> normalizeKeyBounds(NavigableMap<HRegionInfo, ServerName> raw) {
+
+        Map.Entry<HRegionInfo, ServerName> nullStart = null;
+        Map.Entry<HRegionInfo, ServerName> nullEnd = null;
+
+        ImmutableMap.Builder<KeyRange, ServerName> b = ImmutableMap.builder();
+
+        for (Map.Entry<HRegionInfo, ServerName> e : raw.entrySet()) {
+            HRegionInfo regionInfo = e.getKey();
+            byte startKey[] = regionInfo.getStartKey();
+            byte endKey[]   = regionInfo.getEndKey();
+
+            if (0 == startKey.length) {
+                startKey = null;
+                logger.trace("Converted zero-length HBase startKey byte array to null");
+            }
+
+            if (0 == endKey.length) {
+                endKey = null;
+                logger.trace("Converted zero-length HBase endKey byte array to null");
+            }
+
+            if (null == startKey && null == endKey) {
+                Preconditions.checkState(1 == raw.size());
+                logger.debug("HBase table {} has a single region {}", tableName, regionInfo);
+                // Choose arbitrary shared value = startKey = endKey
+                return b.put(new KeyRange(FOUR_ZERO_BYTES, FOUR_ZERO_BYTES), e.getValue()).build();
+            } else if (null == startKey) {
+                logger.debug("Found HRegionInfo with null startKey on server {}: {}", e.getValue(), regionInfo);
+                Preconditions.checkState(null == nullStart);
+                nullStart = e;
+                // I thought endBuf would be inclusive from the HBase javadoc, but in practice it is exclusive
+                StaticBuffer endBuf = StaticArrayBuffer.of(zeroExtend(endKey));
+                // Replace null start key with zeroes
+                b.put(new KeyRange(FOUR_ZERO_BYTES, endBuf), e.getValue());
+            } else if (null == endKey) {
+                logger.debug("Found HRegionInfo with null endKey on server {}: {}", e.getValue(), regionInfo);
+                Preconditions.checkState(null == nullEnd);
+                nullEnd = e;
+                // Replace null end key with zeroes
+                b.put(new KeyRange(StaticArrayBuffer.of(zeroExtend(startKey)), FOUR_ZERO_BYTES), e.getValue());
+            } else {
+                Preconditions.checkState(null != startKey);
+                Preconditions.checkState(null != endKey);
+
+                // Convert HBase's inclusive end keys into exclusive Titan end keys
+                StaticBuffer startBuf = StaticArrayBuffer.of(zeroExtend(startKey));
+                StaticBuffer endBuf = StaticArrayBuffer.of(zeroExtend(endKey));
+
+                KeyRange kr = new KeyRange(startBuf, endBuf);
+                b.put(kr, e.getValue());
+                logger.debug("Found HRegionInfo with non-null end and start keys on server {}: {}", e.getValue(), regionInfo);
+            }
+        }
+
+        // Require either no null key bounds or a pair of them
+        Preconditions.checkState(!(null == nullStart ^ null == nullEnd));
+
+        // Check that every key in the result is at least 4 bytes long
+        Map<KeyRange, ServerName> result = b.build();
+        for (KeyRange kr : result.keySet()) {
+            Preconditions.checkState(4 <= kr.getStart().length());
+            Preconditions.checkState(4 <= kr.getEnd().length());
+        }
+
+        return result;
+    }
+
+    /**
+     * If the parameter is shorter than 4 bytes, then create and return a new 4
+     * byte array with the input array's bytes followed by zero bytes. Otherwise
+     * return the parameter.
+     *
+     * @param dataToPad non-null but possibly zero-length byte array
+     * @return either the parameter or a new array
+     */
+    private final byte[] zeroExtend(byte[] dataToPad) {
+        assert null != dataToPad;
+
+        final int targetLength = 4;
+
+        if (targetLength <= dataToPad.length)
+            return dataToPad;
+
+        byte padded[] = new byte[targetLength];
+
+        for (int i = 0; i < dataToPad.length; i++)
+            padded[i] = dataToPad[i];
+
+        for (int i = dataToPad.length; i < padded.length; i++)
+            padded[i] = (byte)0;
+
+        return padded;
+    }
+
+    public static String shortenCfName(String longName) throws PermanentBackendException {
+        final String s;
+        if (SHORT_CF_NAME_MAP.containsKey(longName)) {
+            s = SHORT_CF_NAME_MAP.get(longName);
+            Preconditions.checkNotNull(s);
+            logger.debug("Substituted default CF name \"{}\" with short form \"{}\" to reduce HBase KeyValue size", longName, s);
+        } else {
+            if (SHORT_CF_NAME_MAP.containsValue(longName)) {
+                String fmt = "Must use CF long-form name \"%s\" instead of the short-form name \"%s\" when configured with %s=true";
+                String msg = String.format(fmt, SHORT_CF_NAME_MAP.inverse().get(longName), longName, SHORT_CF_NAMES.getName());
+                throw new PermanentBackendException(msg);
+            }
+            s = longName;
+            logger.debug("Kept default CF name \"{}\" because it has no associated short form", s);
+        }
+        return s;
+    }
+
+    private HTableDescriptor ensureTableExists(String tableName, String initialCFName, int ttlInSeconds) throws BackendException {
+        AdminMask adm = null;
+
+        HTableDescriptor desc;
+
+        try { // Create our table, if necessary
+            adm = getAdminInterface();
+            /*
+             * Some HBase versions/impls respond badly to attempts to create a
+             * table without at least one CF. See #661. Creating a CF along with
+             * the table avoids HBase carping.
+             */
+            if (adm.tableExists(tableName)) {
+                desc = adm.getTableDescriptor(tableName);
+            } else {
+                desc = createTable(tableName, initialCFName, ttlInSeconds, adm);
+            }
+        } catch (IOException e) {
+            throw new TemporaryBackendException(e);
+        } finally {
+            IOUtils.closeQuietly(adm);
+        }
+
+        return desc;
+    }
+
+    private HTableDescriptor createTable(String tableName, String cfName, int ttlInSeconds, AdminMask adm) throws IOException {
+        HTableDescriptor desc = compat.newTableDescriptor(tableName);
+
+        HColumnDescriptor cdesc = new HColumnDescriptor(cfName);
+        setCFOptions(cdesc, ttlInSeconds);
+
+        compat.addColumnFamilyToTableDescriptor(desc, cdesc);
+
+        int count; // total regions to create
+        String src;
+
+        if (MIN_REGION_COUNT <= (count = regionCount)) {
+            src = "region count configuration";
+        } else if (0 < regionsPerServer &&
+                   MIN_REGION_COUNT <= (count = regionsPerServer * adm.getEstimatedRegionServerCount())) {
+            src = "ClusterStatus server count";
+        } else {
+            count = -1;
+            src = "default";
+        }
+
+        if (MIN_REGION_COUNT < count) {
+            adm.createTable(desc, getStartKey(count), getEndKey(count), count);
+            logger.debug("Created table {} with region count {} from {}", tableName, count, src);
+        } else {
+            adm.createTable(desc);
+            logger.debug("Created table {} with default start key, end key, and region count", tableName);
+        }
+
+        return desc;
+    }
+
+    /**
+     * This method generates the second argument to
+     * {@link HBaseAdmin#createTable(HTableDescriptor, byte[], byte[], int)}.
+     * <p/>
+     * From the {@code createTable} javadoc:
+     * "The start key specified will become the end key of the first region of
+     * the table, and the end key specified will become the start key of the
+     * last region of the table (the first region has a null start key and
+     * the last region has a null end key)"
+     * <p/>
+     * To summarize, the {@code createTable} argument called "startKey" is
+     * actually the end key of the first region.
+     */
+    private byte[] getStartKey(int regionCount) {
+        ByteBuffer regionWidth = ByteBuffer.allocate(4);
+        regionWidth.putInt((int)(((1L << 32) - 1L) / regionCount)).flip();
+        return StaticArrayBuffer.of(regionWidth).getBytes(0, 4);
+    }
+
+    /**
+     * Companion to {@link #getStartKey(int)}. See its javadoc for details.
+     */
+    private byte[] getEndKey(int regionCount) {
+        ByteBuffer regionWidth = ByteBuffer.allocate(4);
+        regionWidth.putInt((int)(((1L << 32) - 1L) / regionCount * (regionCount - 1))).flip();
+        return StaticArrayBuffer.of(regionWidth).getBytes(0, 4);
+    }
+
+    private void ensureColumnFamilyExists(String tableName, String columnFamily, int ttlInSeconds) throws BackendException {
+        AdminMask adm = null;
+        try {
+            adm = getAdminInterface();
+            HTableDescriptor desc = ensureTableExists(tableName, columnFamily, ttlInSeconds);
+
+            Preconditions.checkNotNull(desc);
+
+            HColumnDescriptor cf = desc.getFamily(columnFamily.getBytes());
+
+            // Create our column family, if necessary
+            if (cf == null) {
+                try {
+                    if (!adm.isTableDisabled(tableName)) {
+                        adm.disableTable(tableName);
+                    }
+                } catch (TableNotEnabledException e) {
+                    logger.debug("Table {} already disabled", tableName);
+                } catch (IOException e) {
+                    throw new TemporaryBackendException(e);
+                }
+
+                try {
+                    HColumnDescriptor cdesc = new HColumnDescriptor(columnFamily);
+
+                    setCFOptions(cdesc, ttlInSeconds);
+
+                    adm.addColumn(tableName, cdesc);
+
+                    logger.debug("Added HBase ColumnFamily {}, waiting for 1 sec. to propogate.", columnFamily);
+
+                    adm.enableTable(tableName);
+                } catch (TableNotFoundException ee) {
+                    logger.error("TableNotFoundException", ee);
+                    throw new PermanentBackendException(ee);
+                } catch (org.apache.hadoop.hbase.TableExistsException ee) {
+                    logger.debug("Swallowing exception {}", ee);
+                } catch (IOException ee) {
+                    throw new TemporaryBackendException(ee);
+                }
+            }
+        } finally {
+            IOUtils.closeQuietly(adm);
+        }
+    }
+
+    private void setCFOptions(HColumnDescriptor cdesc, int ttlInSeconds) {
+        if (null != compression && !compression.equals(COMPRESSION_DEFAULT))
+            compat.setCompression(cdesc, compression);
+
+        if (ttlInSeconds > 0)
+            cdesc.setTimeToLive(ttlInSeconds);
+
+        cdesc.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);
+    }
+
+    /**
+     * Convert Titan internal Mutation representation into HBase native commands.
+     *
+     * @param mutations    Mutations to convert into HBase commands.
+     * @param putTimestamp The timestamp to use for Put commands.
+     * @param delTimestamp The timestamp to use for Delete commands.
+     * @return Commands sorted by key converted from Titan internal representation.
+     * @throws com.thinkaurelius.titan.diskstorage.PermanentBackendException
+     */
+    private Map<StaticBuffer, Pair<Put, Delete>> convertToCommands(Map<String, Map<StaticBuffer, KCVMutation>> mutations,
+                                                                   final long putTimestamp,
+                                                                   final long delTimestamp) throws PermanentBackendException {
+        Map<StaticBuffer, Pair<Put, Delete>> commandsPerKey = new HashMap<StaticBuffer, Pair<Put, Delete>>();
+
+        for (Map.Entry<String, Map<StaticBuffer, KCVMutation>> entry : mutations.entrySet()) {
+
+            String cfString = getCfNameForStoreName(entry.getKey());
+            byte[] cfName = cfString.getBytes();
+
+            for (Map.Entry<StaticBuffer, KCVMutation> m : entry.getValue().entrySet()) {
+                byte[] key = m.getKey().as(StaticBuffer.ARRAY_FACTORY);
+                KCVMutation mutation = m.getValue();
+
+                Pair<Put, Delete> commands = commandsPerKey.get(m.getKey());
+
+                if (commands == null) {
+                    commands = new Pair<Put, Delete>();
+                    commandsPerKey.put(m.getKey(), commands);
+                }
+
+                if (mutation.hasDeletions()) {
+                    if (commands.getSecond() == null) {
+                        Delete d = new Delete(key);
+                        compat.setTimestamp(d, delTimestamp);
+                        commands.setSecond(d);
+                    }
+
+                    for (StaticBuffer b : mutation.getDeletions()) {
+                        commands.getSecond().deleteColumns(cfName, b.as(StaticBuffer.ARRAY_FACTORY), delTimestamp);
+                    }
+                }
+
+                if (mutation.hasAdditions()) {
+                    if (commands.getFirst() == null) {
+                        Put p = new Put(key, putTimestamp);
+                        commands.setFirst(p);
+                    }
+
+                    for (Entry e : mutation.getAdditions()) {
+                        commands.getFirst().add(cfName,
+                                e.getColumnAs(StaticBuffer.ARRAY_FACTORY),
+                                putTimestamp,
+                                e.getValueAs(StaticBuffer.ARRAY_FACTORY));
+                    }
+                }
+            }
+        }
+
+        return commandsPerKey;
+    }
+
+    private String getCfNameForStoreName(String storeName) throws PermanentBackendException {
+        return shortCfNames ? shortenCfName(storeName) : storeName;
+    }
+
+    private void checkConfigDeprecation(com.thinkaurelius.titan.diskstorage.configuration.Configuration config) {
+        if (config.has(GraphDatabaseConfiguration.STORAGE_PORT)) {
+            logger.warn("The configuration property {} is ignored for HBase. Set hbase.zookeeper.property.clientPort in hbase-site.xml or {}.hbase.zookeeper.property.clientPort in Titan's configuration file.",
+                    ConfigElement.getPath(GraphDatabaseConfiguration.STORAGE_PORT), ConfigElement.getPath(HBASE_CONFIGURATION_NAMESPACE));
+        }
+    }
+
+    private AdminMask getAdminInterface() {
+        try {
+            return cnx.getAdmin();
+        } catch (IOException e) {
+            throw new TitanException(e);
+        }
+    }
+
+    /**
+     * Similar to {@link Function}, except that the {@code apply} method is allowed
+     * to throw {@link BackendException}.
+     */
+    private static interface BackendFunction<F, T> {
+
+        T apply(F input) throws BackendException;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/9edf9e52/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseTransaction.java
----------------------------------------------------------------------
diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseTransaction.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseTransaction.java
new file mode 100644
index 0000000..e13593f
--- /dev/null
+++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseTransaction.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import com.thinkaurelius.titan.diskstorage.BackendException;
+import com.thinkaurelius.titan.diskstorage.BaseTransactionConfig;
+import com.thinkaurelius.titan.diskstorage.StaticBuffer;
+import com.thinkaurelius.titan.diskstorage.common.AbstractStoreTransaction;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreTransaction;
+import com.thinkaurelius.titan.diskstorage.locking.LocalLockMediator;
+import com.thinkaurelius.titan.diskstorage.util.KeyColumn;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/**
+ * This class overrides and adds nothing compared with
+ * {@link com.thinkaurelius.titan.diskstorage.locking.consistentkey.ExpectedValueCheckingTransaction}; however, it creates a transaction type specific
+ * to HBase, which lets us check for user errors like passing a Cassandra
+ * transaction into a HBase method.
+ *
+ * @author Dan LaRocque <da...@hopcount.org>
+ */
+public class HBaseTransaction extends AbstractStoreTransaction {
+
+    private static final Logger log = LoggerFactory.getLogger(HBaseTransaction.class);
+
+    LocalLockMediator<StoreTransaction> llm;
+
+    Set<KeyColumn> keyColumnLocks = new LinkedHashSet<>();
+
+    public HBaseTransaction(final BaseTransactionConfig config, LocalLockMediator<StoreTransaction> llm) {
+        super(config);
+        this.llm = llm;
+    }
+
+    @Override
+    public synchronized void rollback() throws BackendException {
+        super.rollback();
+        log.debug("Rolled back transaction");
+        deleteAllLocks();
+    }
+
+    @Override
+    public synchronized void commit() throws BackendException {
+        super.commit();
+        log.debug("Committed transaction");
+        deleteAllLocks();
+    }
+
+    public void updateLocks(KeyColumn lockID, StaticBuffer expectedValue) {
+        keyColumnLocks.add(lockID);
+    }
+
+    private void deleteAllLocks() {
+        for(KeyColumn kc : keyColumnLocks) {
+            log.debug("Removed lock {} ", kc);
+            llm.unlock(kc, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/9edf9e52/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection0_98.java
----------------------------------------------------------------------
diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection0_98.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection0_98.java
new file mode 100644
index 0000000..8660644
--- /dev/null
+++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection0_98.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnection;
+
+public class HConnection0_98 implements ConnectionMask
+{
+
+    private final HConnection cnx;
+
+    public HConnection0_98(HConnection cnx)
+    {
+        this.cnx = cnx;
+    }
+
+    @Override
+    public TableMask getTable(String name) throws IOException
+    {
+        return new HTable0_98(cnx.getTable(name));
+    }
+
+    @Override
+    public AdminMask getAdmin() throws IOException
+    {
+        return new HBaseAdmin0_98(new HBaseAdmin(cnx));
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+        cnx.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/9edf9e52/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection1_0.java
----------------------------------------------------------------------
diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection1_0.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection1_0.java
new file mode 100644
index 0000000..91e5026
--- /dev/null
+++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection1_0.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+
+public class HConnection1_0 implements ConnectionMask
+{
+
+    private final Connection cnx;
+
+    public HConnection1_0(Connection cnx)
+    {
+        this.cnx = cnx;
+    }
+
+    @Override
+    public TableMask getTable(String name) throws IOException
+    {
+        return new HTable1_0(cnx.getTable(TableName.valueOf(name)));
+    }
+
+    @Override
+    public AdminMask getAdmin() throws IOException
+    {
+        return new HBaseAdmin1_0(new HBaseAdmin(cnx));
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+        cnx.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/9edf9e52/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable0_98.java
----------------------------------------------------------------------
diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable0_98.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable0_98.java
new file mode 100644
index 0000000..4ddb2f0
--- /dev/null
+++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable0_98.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.client.Scan;
+
+public class HTable0_98 implements TableMask
+{
+    private final HTableInterface table;
+
+    public HTable0_98(HTableInterface table)
+    {
+        this.table = table;
+    }
+
+    @Override
+    public ResultScanner getScanner(Scan filter) throws IOException
+    {
+        return table.getScanner(filter);
+    }
+
+    @Override
+    public Result[] get(List<Get> gets) throws IOException
+    {
+        return table.get(gets);
+    }
+
+    @Override
+    public void batch(List<Row> writes, Object[] results) throws IOException, InterruptedException
+    {
+        table.batch(writes, results);
+        table.flushCommits();
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+        table.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/9edf9e52/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable1_0.java
----------------------------------------------------------------------
diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable1_0.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable1_0.java
new file mode 100644
index 0000000..5085abb
--- /dev/null
+++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable1_0.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+
+public class HTable1_0 implements TableMask
+{
+    private final Table table;
+
+    public HTable1_0(Table table)
+    {
+        this.table = table;
+    }
+
+    @Override
+    public ResultScanner getScanner(Scan filter) throws IOException
+    {
+        return table.getScanner(filter);
+    }
+
+    @Override
+    public Result[] get(List<Get> gets) throws IOException
+    {
+        return table.get(gets);
+    }
+
+    @Override
+    public void batch(List<Row> writes, Object[] results) throws IOException, InterruptedException
+    {
+        table.batch(writes, results);
+        /* table.flushCommits(); not needed anymore */
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+        table.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/9edf9e52/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/TableMask.java
----------------------------------------------------------------------
diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/TableMask.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/TableMask.java
new file mode 100644
index 0000000..dd3d61e
--- /dev/null
+++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/TableMask.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.client.Scan;
+
+/**
+ * This interface hides ABI/API breaking changes that HBase has made to its Table/HTableInterface over the course
+ * of development from 0.94 to 1.0 and beyond.
+ */
+public interface TableMask extends Closeable
+{
+
+    ResultScanner getScanner(Scan filter) throws IOException;
+
+    Result[] get(List<Get> gets) throws IOException;
+
+    void batch(List<Row> writes, Object[] results) throws IOException, InterruptedException;
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/9edf9e52/titan/src/main/java/com/thinkaurelius/titan/diskstorage/locking/LocalLockMediator.java
----------------------------------------------------------------------
diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/locking/LocalLockMediator.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/locking/LocalLockMediator.java
new file mode 100644
index 0000000..20c59e1
--- /dev/null
+++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/locking/LocalLockMediator.java
@@ -0,0 +1,345 @@
+/*
+ * Copyright 2012-2013 Aurelius LLC
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.thinkaurelius.titan.diskstorage.locking;
+
+import com.google.common.base.Preconditions;
+import com.thinkaurelius.titan.diskstorage.util.time.Timepoint;
+import com.thinkaurelius.titan.diskstorage.util.time.TimestampProvider;
+import com.thinkaurelius.titan.diskstorage.locking.consistentkey.ExpectedValueCheckingTransaction;
+import com.thinkaurelius.titan.diskstorage.util.KeyColumn;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class resolves lock contention between two transactions on the same JVM.
+ * <p/>
+ * This is not just an optimization to reduce network traffic. Locks written by
+ * Titan to a distributed key-value store contain an identifier, the "Rid",
+ * which is unique only to the process level. The Rid can't tell which
+ * transaction in a process holds any given lock. This class prevents two
+ * transactions in a single process from concurrently writing the same lock to a
+ * distributed key-value store.
+ *
+ * @author Dan LaRocque <da...@hopcount.org>
+ */
+
+public class LocalLockMediator<T> {
+
+    private static final Logger log = LoggerFactory
+        .getLogger(LocalLockMediator.class);
+
+    /**
+     * Namespace for which this mediator is responsible
+     *
+     * @see LocalLockMediatorProvider
+     */
+    private final String name;
+
+    private final TimestampProvider times;
+
+    private DelayQueue<ExpirableKeyColumn> expiryQueue = new DelayQueue<>();
+
+    private ExecutorService lockCleanerService = Executors.newFixedThreadPool(1, new ThreadFactory() {
+        @Override
+        public Thread newThread(Runnable runnable) {
+            Thread thread = Executors.defaultThreadFactory().newThread(runnable);
+            thread.setDaemon(true);
+            return thread;
+        }
+    });
+
+
+
+    /**
+     * Maps a ({@code key}, {@code column}) pair to the local transaction
+     * holding a lock on that pair. Values in this map may have already expired
+     * according to {@link AuditRecord#expires}, in which case the lock should
+     * be considered invalid.
+     */
+    private final ConcurrentHashMap<KeyColumn, AuditRecord<T>> locks = new ConcurrentHashMap<KeyColumn, AuditRecord<T>>();
+
+    public LocalLockMediator(String name, TimestampProvider times) {
+        this.name = name;
+        this.times = times;
+
+        Preconditions.checkNotNull(name);
+        Preconditions.checkNotNull(times);
+        lockCleanerService.submit(new LockCleaner());
+    }
+
+    /**
+     * Acquire the lock specified by {@code kc}.
+     * <p/>
+     * <p/>
+     * For any particular key-column, whatever value of {@code requestor} is
+     * passed to this method must also be passed to the associated later call to
+     * {@link #unlock(KeyColumn, ExpectedValueCheckingTransaction)}.
+     * <p/>
+     * If some requestor {@code r} calls this method on a KeyColumn {@code k}
+     * and this method returns true, then subsequent calls to this method by
+     * {@code r} on {@code l} merely attempt to update the {@code expiresAt}
+     * timestamp. This differs from typical lock reentrance: multiple successful
+     * calls to this method do not require an equal number of calls to
+     * {@code #unlock()}. One {@code #unlock()} call is enough, no matter how
+     * many times a {@code requestor} called {@code lock} beforehand. Note that
+     * updating the timestamp may fail, in which case the lock is considered to
+     * have expired and the calling context should assume it no longer holds the
+     * lock specified by {@code kc}.
+     * <p/>
+     * The number of nanoseconds elapsed since the UNIX Epoch is not readily
+     * available within the JVM. When reckoning expiration times, this method
+     * uses the approximation implemented by
+     * {@link com.thinkaurelius.titan.diskstorage.util.NanoTime#getApproxNSSinceEpoch(false)}.
+     * <p/>
+     * The current implementation of this method returns true when given an
+     * {@code expiresAt} argument in the past. Future implementations may return
+     * false instead.
+     *
+     * @param kc        lock identifier
+     * @param requestor the object locking {@code kc}
+     * @param expires   instant at which this lock will automatically expire
+     * @return true if the lock is acquired, false if it was not acquired
+     */
+    public boolean lock(KeyColumn kc, T requestor, Timepoint expires) {
+        assert null != kc;
+        assert null != requestor;
+
+        AuditRecord<T> audit = new AuditRecord<T>(requestor, expires);
+        AuditRecord<T> inmap = locks.putIfAbsent(kc, audit);
+
+        boolean success = false;
+
+        if (null == inmap) {
+            // Uncontended lock succeeded
+            if (log.isTraceEnabled()) {
+                log.trace("New local lock created: {} namespace={} txn={}",
+                    new Object[]{kc, name, requestor});
+            }
+            success = true;
+        } else if (inmap.equals(audit)) {
+            // requestor has already locked kc; update expiresAt
+            success = locks.replace(kc, inmap, audit);
+            if (log.isTraceEnabled()) {
+                if (success) {
+                    log.trace(
+                        "Updated local lock expiration: {} namespace={} txn={} oldexp={} newexp={}",
+                        new Object[]{kc, name, requestor, inmap.expires,
+                            audit.expires});
+                } else {
+                    log.trace(
+                        "Failed to update local lock expiration: {} namespace={} txn={} oldexp={} newexp={}",
+                        new Object[]{kc, name, requestor, inmap.expires,
+                            audit.expires});
+                }
+            }
+        } else if (0 > inmap.expires.compareTo(times.getTime())) {
+            // the recorded lock has expired; replace it
+            success = locks.replace(kc, inmap, audit);
+            if (log.isTraceEnabled()) {
+                log.trace(
+                    "Discarding expired lock: {} namespace={} txn={} expired={}",
+                    new Object[]{kc, name, inmap.holder, inmap.expires});
+            }
+        } else {
+            // we lost to a valid lock
+            if (log.isTraceEnabled()) {
+                log.trace(
+                    "Local lock failed: {} namespace={} txn={} (already owned by {})",
+                    new Object[]{kc, name, requestor, inmap});
+            }
+        }
+
+        if (success) {
+            expiryQueue.add(new ExpirableKeyColumn(kc, expires));
+        }
+        return success;
+    }
+
+    /**
+     * Release the lock specified by {@code kc} and which was previously
+     * locked by {@code requestor}, if it is possible to release it.
+     *
+     * @param kc        lock identifier
+     * @param requestor the object which previously locked {@code kc}
+     */
+    public boolean unlock(KeyColumn kc, T requestor) {
+
+        if (!locks.containsKey(kc)) {
+            log.info("Local unlock failed: no locks found for {}", kc);
+            return false;
+        }
+
+        AuditRecord<T> unlocker = new AuditRecord<T>(requestor, null);
+
+        AuditRecord<T> holder = locks.get(kc);
+
+        if (!holder.equals(unlocker)) {
+            log.error("Local unlock of {} by {} failed: it is held by {}",
+                new Object[]{kc, unlocker, holder});
+            return false;
+        }
+
+        boolean removed = locks.remove(kc, unlocker);
+
+        if (removed) {
+            expiryQueue.remove(kc);
+            if (log.isTraceEnabled()) {
+                log.trace("Local unlock succeeded: {} namespace={} txn={}",
+                    new Object[]{kc, name, requestor});
+            }
+        } else {
+            log.warn("Local unlock warning: lock record for {} disappeared "
+                + "during removal; this suggests the lock either expired "
+                + "while we were removing it, or that it was erroneously "
+                + "unlocked multiple times.", kc);
+        }
+
+        // Even if !removed, we're finished unlocking, so return true
+        return true;
+    }
+
+    public String toString() {
+        return "LocalLockMediator [" + name + ",  ~" + locks.size()
+            + " current locks]";
+    }
+
+    /**
+     * A record containing the local transaction that holds a lock and the
+     * lock's expiration time.
+     */
+    private static class AuditRecord<T> {
+
+        /**
+         * The local transaction that holds/held the lock.
+         */
+        private final T holder;
+        /**
+         * The expiration time of a the lock.
+         */
+        private final Timepoint expires;
+        /**
+         * Cached hashCode.
+         */
+        private int hashCode;
+
+        private AuditRecord(T holder, Timepoint expires) {
+            this.holder = holder;
+            this.expires = expires;
+        }
+
+        /**
+         * This implementation depends only on the lock holder and not on the
+         * lock expiration time.
+         */
+        @Override
+        public int hashCode() {
+            if (0 == hashCode)
+                hashCode = holder.hashCode();
+
+            return hashCode;
+        }
+
+        /**
+         * This implementation depends only on the lock holder and not on the
+         * lock expiration time.
+         */
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj)
+                return true;
+            if (obj == null)
+                return false;
+            if (getClass() != obj.getClass())
+                return false;
+            /*
+             * This warning suppression is harmless because we are only going to
+             * call other.holder.equals(...), and since equals(...) is part of
+             * Object, it is guaranteed to be defined no matter the concrete
+             * type of parameter T.
+             */
+            @SuppressWarnings("rawtypes")
+            AuditRecord other = (AuditRecord) obj;
+            if (holder == null) {
+                if (other.holder != null)
+                    return false;
+            } else if (!holder.equals(other.holder))
+                return false;
+            return true;
+        }
+
+        @Override
+        public String toString() {
+            return "AuditRecord [txn=" + holder + ", expires=" + expires + "]";
+        }
+
+    }
+
+    private class LockCleaner implements Runnable {
+
+        @Override
+        public void run() {
+            try {
+                while (true) {
+                    log.debug("Lock Cleaner service started");
+                    ExpirableKeyColumn lock = expiryQueue.take();
+                    log.debug("Expiring key column " + lock.getKeyColumn());
+                    locks.remove(lock.getKeyColumn());
+                }
+            } catch (InterruptedException e) {
+                log.debug("Received interrupt. Exiting");
+            }
+        }
+    }
+
+    private static class ExpirableKeyColumn implements Delayed {
+
+        private Timepoint expiryTime;
+        private KeyColumn kc;
+
+        public ExpirableKeyColumn(KeyColumn keyColumn, Timepoint expiryTime) {
+            this.kc = keyColumn;
+            this.expiryTime = expiryTime;
+        }
+
+        @Override
+        public long getDelay(TimeUnit unit) {
+            return expiryTime.getTimestamp(TimeUnit.NANOSECONDS);
+        }
+
+        @Override
+        public int compareTo(Delayed o) {
+            if (this.expiryTime.getTimestamp(TimeUnit.NANOSECONDS) < ((ExpirableKeyColumn) o).expiryTime.getTimestamp(TimeUnit.NANOSECONDS)) {
+                return -1;
+            }
+            if (this.expiryTime.getTimestamp(TimeUnit.NANOSECONDS) > ((ExpirableKeyColumn) o).expiryTime.getTimestamp(TimeUnit.NANOSECONDS)) {
+                return 1;
+            }
+            return 0;
+        }
+
+        public KeyColumn getKeyColumn() {
+            return kc;
+        }
+    }
+}