You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by to...@apache.org on 2011/12/21 07:43:05 UTC

svn commit: r1221635 [1/2] - in /incubator/hcatalog/trunk: ./ storage-drivers/hbase/ storage-drivers/hbase/if/ storage-drivers/hbase/ivy/ storage-drivers/hbase/src/gen-java/ storage-drivers/hbase/src/gen-java/org/ storage-drivers/hbase/src/gen-java/org...

Author: toffer
Date: Wed Dec 21 07:43:04 2011
New Revision: 1221635

URL: http://svn.apache.org/viewvc?rev=1221635&view=rev
Log:
HCATALOG-189 : Zookpeeper based revision manager for HBase Storage Driver (avandana via toffer)

Added:
    incubator/hcatalog/trunk/storage-drivers/hbase/if/
    incubator/hcatalog/trunk/storage-drivers/hbase/if/transaction.thrift
    incubator/hcatalog/trunk/storage-drivers/hbase/src/gen-java/
    incubator/hcatalog/trunk/storage-drivers/hbase/src/gen-java/org/
    incubator/hcatalog/trunk/storage-drivers/hbase/src/gen-java/org/apache/
    incubator/hcatalog/trunk/storage-drivers/hbase/src/gen-java/org/apache/hcatalog/
    incubator/hcatalog/trunk/storage-drivers/hbase/src/gen-java/org/apache/hcatalog/hbase/
    incubator/hcatalog/trunk/storage-drivers/hbase/src/gen-java/org/apache/hcatalog/hbase/snapshot/
    incubator/hcatalog/trunk/storage-drivers/hbase/src/gen-java/org/apache/hcatalog/hbase/snapshot/transaction/
    incubator/hcatalog/trunk/storage-drivers/hbase/src/gen-java/org/apache/hcatalog/hbase/snapshot/transaction/thrift/
    incubator/hcatalog/trunk/storage-drivers/hbase/src/gen-java/org/apache/hcatalog/hbase/snapshot/transaction/thrift/StoreFamilyRevision.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/gen-java/org/apache/hcatalog/hbase/snapshot/transaction/thrift/StoreFamilyRevisionList.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/FamilyRevision.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/IDGenerator.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/PathUtil.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/Transaction.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKUtil.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/LockListener.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ProtocolSupport.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/WriteLock.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ZNodeName.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ZooKeeperOperation.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/IDGenClient.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestIDGenerator.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManager.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestThriftSerialization.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/lock/
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/lock/WriteLockTest.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/lock/ZNodeNameTest.java
Modified:
    incubator/hcatalog/trunk/CHANGES.txt
    incubator/hcatalog/trunk/storage-drivers/hbase/build.xml
    incubator/hcatalog/trunk/storage-drivers/hbase/ivy.xml
    incubator/hcatalog/trunk/storage-drivers/hbase/ivy/libraries.properties

Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1221635&r1=1221634&r2=1221635&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Wed Dec 21 07:43:04 2011
@@ -23,6 +23,8 @@ Trunk (unreleased changes)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+  HCAT-189. Zookpeeper based revision manager for HBase Storage Driver (avandana via toffer)
+
   HCAT-145. Add support for binary data type (hashutosh)  
 
   HCAT-151. Fixed native table names used for tables stored in non-default DBs in HBaseInputStorageDriver (avandana via toffer)

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/build.xml
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/build.xml?rev=1221635&r1=1221634&r2=1221635&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/build.xml (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/build.xml Wed Dec 21 07:43:04 2011
@@ -35,7 +35,6 @@
     <property name="driver.version" value="0.1.0"/>
     <property name="driver.jar" value="${ant.project.name}-${driver.version}.jar"/>
     <property name="final.name" value="${ant.project.name}-${driver.version}" />
-
     <property name="hcatalog.dir" value="${basedir}/../../" />
 
     <!-- hive properties -->
@@ -65,6 +64,8 @@
     <property name="test.build.dir" value="${build.dir}/test" />
     <property name="test.build.classes" value="${test.build.dir}/classes" />
     <property name="test.log.dir" value="${test.build.dir}/logs" />
+    <property name="test.tmp.dir" value="${test.build.dir}/temp" />
+    <property name="test.data.dir" value="${test.build.dir}/data" />
     <property name="test.timeout" value="2700000" />
     <property name="test.junit.output.format" value="plain" />
     <property name="test.all.file" value="${test.src.dir}/all-tests"/>
@@ -110,10 +111,37 @@
         <pathelement location="${build.classes}" />
         <pathelement location="conf"/>
         <pathelement location="${hive.conf.dir}"/>
+        <!-- jars Thrift depends on -->
+        <fileset dir="${ivy.lib.dir}" includes="libthrift*.jar"/>
+        <!-- jars Hive depends on -->
+        <fileset dir="${hive.root}/build/ivy/lib/default/">
+            <include name="**/*.jar" />
+            <exclude name="*hbase*.jar" />
+        </fileset>
+        <!-- jars Hadoop depends on -->
+        <fileset dir="${hive.root}/build/hadoopcore/hadoop-0.20.3-CDH3-SNAPSHOT/lib/" >
+            <include name="**/*.jar" />
+        </fileset>
+        <pathelement location="${driver.jar}"/>
+        <path refid="classpath"/>
+        <fileset dir="${hive.root}/build/hadoopcore/hadoop-0.20.3-CDH3-SNAPSHOT/"
+                 includes="hadoop-test-0.20.3-CDH3-SNAPSHOT.jar"/>
+    </path>
+
+    <path id="test.zookeeper.classpath">
+        <pathelement location="${test.build.classes}" />
+        <pathelement location="${build.classes}" />
+        <pathelement location="conf"/>
+        <pathelement location="${hive.conf.dir}"/>
+        <!-- jars Zookeeper depends on -->
+        <fileset dir="${ivy.lib.dir}" includes="zookeeper*.jar"/>
+        <!-- jars Thrift depends on -->
+        <fileset dir="${ivy.lib.dir}" includes="libthrift*.jar"/>
         <!-- jars Hive depends on -->
         <fileset dir="${hive.root}/build/ivy/lib/default/">
             <include name="**/*.jar" />
             <exclude name="*hbase*.jar" />
+            <exclude name="*zookeeper*.jar" />
         </fileset>
         <!-- jars Hadoop depends on -->
         <fileset dir="${hive.root}/build/hadoopcore/hadoop-0.20.3-CDH3-SNAPSHOT/lib/" >
@@ -195,7 +223,6 @@
         <mkdir dir="${build.classes}" />
         <mkdir dir="${test.build.classes}" />
     </target>
-
     <!--
     ================================================================================
     Main Build and Jar Section
@@ -203,7 +230,7 @@
     -->
     <!-- Compile src files -->
     <target name="compile-src" depends="init">
-        <javac encoding="${build.encoding}" srcdir="${src.dir}" excludes="${excludes}"
+        <javac encoding="${build.encoding}" srcdir="${src.dir}:${basedir}/src/gen-java" excludes="${excludes}"
                includes="**/*.java" destdir="${build.classes}" debug="${javac.debug}"
                optimize="${javac.optimize}" target="${javac.version}"
                source="${javac.version}" deprecation="${javac.deprecation}"
@@ -240,8 +267,12 @@
         <sequential>
 
             <delete dir="${test.log.dir}"/>
+	    <delete dir="${test.tmp.dir}" />
+            <delete dir="${test.data.dir}" />
             <mkdir dir="${test.log.dir}"/>
-            <junit showoutput="${test.output}" printsummary="yes" haltonfailure="no"
+	    <mkdir dir="${test.tmp.dir}" />
+            <mkdir dir="${test.data.dir}" />
+	    <junit showoutput="${test.output}" printsummary="yes" haltonfailure="no"
                    fork="yes" maxmemory="512m" dir="${basedir}" timeout="${test.timeout}"
                    errorProperty="tests.failed" failureProperty="tests.failed">
                 <classpath>
@@ -261,7 +292,7 @@
                 </batchtest>
                 <!-- Run one test case.  To use this define -Dtestcase=X on the command line -->
                 <batchtest fork="yes" todir="${test.log.dir}" if="testcase">
-                    <fileset dir="src/test" includes="**/${testcase}.java"/>
+                    <fileset dir="src/test" includes="**/${testcase}.java" excludes="**/lock/*.java"/>
                 </batchtest>
 
                 <assertions>
@@ -269,6 +300,31 @@
                 </assertions>
 
             </junit>
+           <!-- TODO: Remove the following when using HBase 0.92 -->
+            <junit showoutput="${test.output}" printsummary="yes" haltonfailure="no"
+                   fork="yes" maxmemory="512m" dir="${basedir}" timeout="${test.timeout}"
+                   errorProperty="tests.failed" failureProperty="tests.failed">
+            	<sysproperty key="build.test.dir" value="${test.tmp.dir}" />
+            	<sysproperty key="test.data.dir" value="${test.data.dir}" />
+            	<sysproperty key="log4j.configuration" value="file:${basedir}/log4j.properties" />
+                <classpath>
+                    <pathelement location="${test.build.classes}" />
+                    <pathelement location="." />
+                    <path refid="test.zookeeper.classpath"/>
+                </classpath>
+                <formatter type="${test.junit.output.format}" />
+                <batchtest fork="yes" todir="${test.log.dir}" unless="testcase">                
+                    <fileset dir="src/test" includes="**/lock/*Test.java"/>
+                </batchtest>
+                <!-- Run one test case.  To use this define -Dtestcase=X on the command line -->
+                <batchtest fork="yes" todir="${test.log.dir}" if="testcase">
+                    <fileset dir="src/test" includes="**/lock/${testcase}.java"/>
+                </batchtest>                
+                <assertions>
+                    <enable />
+                </assertions>
+
+            </junit>
             <fail if="tests.failed">Tests failed!</fail>
         </sequential>
     </target>
@@ -325,8 +381,10 @@
                 <include name="**/*"/>
                 <!-- exclude test jars -->
                 <exclude name="*-tests.jar"/>
+                <exclude name="*zookeeper*.jar"/>
             </fileset>
             <fileset dir="${hive.root}/build/hbase-handler" includes="*.jar"/>
+            <fileset dir="${hive.root}/build/ivy/lib/default" includes="*zookeeper*.jar"/>
         </copy>
 
 

Added: incubator/hcatalog/trunk/storage-drivers/hbase/if/transaction.thrift
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/if/transaction.thrift?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/if/transaction.thrift (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/if/transaction.thrift Wed Dec 21 07:43:04 2011
@@ -0,0 +1,11 @@
+namespace java org.apache.hcatalog.hbase.snapshot.transaction.thrift
+namespace cpp Apache.HCatalog.HBase
+
+struct StoreFamilyRevision {
+  1: i64 revision,
+  2: i64 timestamp
+}
+
+struct StoreFamilyRevisionList {
+  1: list<StoreFamilyRevision> revisionList
+}

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/ivy.xml
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/ivy.xml?rev=1221635&r1=1221634&r2=1221635&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/ivy.xml (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/ivy.xml Wed Dec 21 07:43:04 2011
@@ -41,6 +41,11 @@
             <artifact name="hbase" type="jar" ext="jar"/>
             <artifact name="hbase" type="test-jar" ext="jar" m:classifier="tests"/>
         </dependency>
-        <dependency org="org.apache.zookeeper" name="zookeeper" rev="${zookeeper.version}" conf="common->master"/>
+        <dependency org="org.apache.zookeeper" name="zookeeper" rev="${zookeeper.version}"
+                    conf="common->master">
+            <artifact name="zookeeper" type="jar" ext="jar"/>
+            <artifact name="zookeeper" type="test-jar" ext="jar" m:classifier="tests"/>
+        </dependency>
+	<dependency org="org.apache.thrift" name="libthrift" rev="${thrift.version}" conf="common->master"/>
     </dependencies>
 </ivy-module>

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/ivy/libraries.properties?rev=1221635&r1=1221634&r2=1221635&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/ivy/libraries.properties (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/ivy/libraries.properties Wed Dec 21 07:43:04 2011
@@ -17,4 +17,5 @@ junit.version=3.8.1
 ivy.version=2.2.0
 rats-lib.version=0.5.1
 hbase.version=0.90.3
-zookeeper.version=3.3.1
+zookeeper.version=3.4.0
+thrift.version=0.7.0

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/gen-java/org/apache/hcatalog/hbase/snapshot/transaction/thrift/StoreFamilyRevision.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/gen-java/org/apache/hcatalog/hbase/snapshot/transaction/thrift/StoreFamilyRevision.java?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/gen-java/org/apache/hcatalog/hbase/snapshot/transaction/thrift/StoreFamilyRevision.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/gen-java/org/apache/hcatalog/hbase/snapshot/transaction/thrift/StoreFamilyRevision.java Wed Dec 21 07:43:04 2011
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This class is used to store the revision and timestamp of a column family
+ * in a transaction.
+ *
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.7.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ */
+package org.apache.hcatalog.hbase.snapshot.transaction.thrift;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StoreFamilyRevision implements org.apache.thrift.TBase<StoreFamilyRevision, StoreFamilyRevision._Fields>, java.io.Serializable, Cloneable {
+  private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("StoreFamilyRevision");
+
+  private static final org.apache.thrift.protocol.TField REVISION_FIELD_DESC = new org.apache.thrift.protocol.TField("revision", org.apache.thrift.protocol.TType.I64, (short)1);
+  private static final org.apache.thrift.protocol.TField TIMESTAMP_FIELD_DESC = new org.apache.thrift.protocol.TField("timestamp", org.apache.thrift.protocol.TType.I64, (short)2);
+
+  public long revision; // required
+  public long timestamp; // required
+
+  /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+  public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+    REVISION((short)1, "revision"),
+    TIMESTAMP((short)2, "timestamp");
+
+    private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+    static {
+      for (_Fields field : EnumSet.allOf(_Fields.class)) {
+        byName.put(field.getFieldName(), field);
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, or null if its not found.
+     */
+    public static _Fields findByThriftId(int fieldId) {
+      switch(fieldId) {
+        case 1: // REVISION
+          return REVISION;
+        case 2: // TIMESTAMP
+          return TIMESTAMP;
+        default:
+          return null;
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, throwing an exception
+     * if it is not found.
+     */
+    public static _Fields findByThriftIdOrThrow(int fieldId) {
+      _Fields fields = findByThriftId(fieldId);
+      if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+      return fields;
+    }
+
+    /**
+     * Find the _Fields constant that matches name, or null if its not found.
+     */
+    public static _Fields findByName(String name) {
+      return byName.get(name);
+    }
+
+    private final short _thriftId;
+    private final String _fieldName;
+
+    _Fields(short thriftId, String fieldName) {
+      _thriftId = thriftId;
+      _fieldName = fieldName;
+    }
+
+    public short getThriftFieldId() {
+      return _thriftId;
+    }
+
+    public String getFieldName() {
+      return _fieldName;
+    }
+  }
+
+  // isset id assignments
+  private static final int __REVISION_ISSET_ID = 0;
+  private static final int __TIMESTAMP_ISSET_ID = 1;
+  private BitSet __isset_bit_vector = new BitSet(2);
+
+  public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+  static {
+    Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+    tmpMap.put(_Fields.REVISION, new org.apache.thrift.meta_data.FieldMetaData("revision", org.apache.thrift.TFieldRequirementType.DEFAULT,
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+    tmpMap.put(_Fields.TIMESTAMP, new org.apache.thrift.meta_data.FieldMetaData("timestamp", org.apache.thrift.TFieldRequirementType.DEFAULT,
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+    metaDataMap = Collections.unmodifiableMap(tmpMap);
+    org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(StoreFamilyRevision.class, metaDataMap);
+  }
+
+  public StoreFamilyRevision() {
+  }
+
+  public StoreFamilyRevision(
+    long revision,
+    long timestamp)
+  {
+    this();
+    this.revision = revision;
+    setRevisionIsSet(true);
+    this.timestamp = timestamp;
+    setTimestampIsSet(true);
+  }
+
+  /**
+   * Performs a deep copy on <i>other</i>.
+   */
+  public StoreFamilyRevision(StoreFamilyRevision other) {
+    __isset_bit_vector.clear();
+    __isset_bit_vector.or(other.__isset_bit_vector);
+    this.revision = other.revision;
+    this.timestamp = other.timestamp;
+  }
+
+  public StoreFamilyRevision deepCopy() {
+    return new StoreFamilyRevision(this);
+  }
+
+  @Override
+  public void clear() {
+    setRevisionIsSet(false);
+    this.revision = 0;
+    setTimestampIsSet(false);
+    this.timestamp = 0;
+  }
+
+  public long getRevision() {
+    return this.revision;
+  }
+
+  public StoreFamilyRevision setRevision(long revision) {
+    this.revision = revision;
+    setRevisionIsSet(true);
+    return this;
+  }
+
+  public void unsetRevision() {
+    __isset_bit_vector.clear(__REVISION_ISSET_ID);
+  }
+
+  /** Returns true if field revision is set (has been assigned a value) and false otherwise */
+  public boolean isSetRevision() {
+    return __isset_bit_vector.get(__REVISION_ISSET_ID);
+  }
+
+  public void setRevisionIsSet(boolean value) {
+    __isset_bit_vector.set(__REVISION_ISSET_ID, value);
+  }
+
+  public long getTimestamp() {
+    return this.timestamp;
+  }
+
+  public StoreFamilyRevision setTimestamp(long timestamp) {
+    this.timestamp = timestamp;
+    setTimestampIsSet(true);
+    return this;
+  }
+
+  public void unsetTimestamp() {
+    __isset_bit_vector.clear(__TIMESTAMP_ISSET_ID);
+  }
+
+  /** Returns true if field timestamp is set (has been assigned a value) and false otherwise */
+  public boolean isSetTimestamp() {
+    return __isset_bit_vector.get(__TIMESTAMP_ISSET_ID);
+  }
+
+  public void setTimestampIsSet(boolean value) {
+    __isset_bit_vector.set(__TIMESTAMP_ISSET_ID, value);
+  }
+
+  public void setFieldValue(_Fields field, Object value) {
+    switch (field) {
+    case REVISION:
+      if (value == null) {
+        unsetRevision();
+      } else {
+        setRevision((Long)value);
+      }
+      break;
+
+    case TIMESTAMP:
+      if (value == null) {
+        unsetTimestamp();
+      } else {
+        setTimestamp((Long)value);
+      }
+      break;
+
+    }
+  }
+
+  public Object getFieldValue(_Fields field) {
+    switch (field) {
+    case REVISION:
+      return Long.valueOf(getRevision());
+
+    case TIMESTAMP:
+      return Long.valueOf(getTimestamp());
+
+    }
+    throw new IllegalStateException();
+  }
+
+  /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+  public boolean isSet(_Fields field) {
+    if (field == null) {
+      throw new IllegalArgumentException();
+    }
+
+    switch (field) {
+    case REVISION:
+      return isSetRevision();
+    case TIMESTAMP:
+      return isSetTimestamp();
+    }
+    throw new IllegalStateException();
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that == null)
+      return false;
+    if (that instanceof StoreFamilyRevision)
+      return this.equals((StoreFamilyRevision)that);
+    return false;
+  }
+
+  public boolean equals(StoreFamilyRevision that) {
+    if (that == null)
+      return false;
+
+    boolean this_present_revision = true;
+    boolean that_present_revision = true;
+    if (this_present_revision || that_present_revision) {
+      if (!(this_present_revision && that_present_revision))
+        return false;
+      if (this.revision != that.revision)
+        return false;
+    }
+
+    boolean this_present_timestamp = true;
+    boolean that_present_timestamp = true;
+    if (this_present_timestamp || that_present_timestamp) {
+      if (!(this_present_timestamp && that_present_timestamp))
+        return false;
+      if (this.timestamp != that.timestamp)
+        return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    return 0;
+  }
+
+  public int compareTo(StoreFamilyRevision other) {
+    if (!getClass().equals(other.getClass())) {
+      return getClass().getName().compareTo(other.getClass().getName());
+    }
+
+    int lastComparison = 0;
+    StoreFamilyRevision typedOther = (StoreFamilyRevision)other;
+
+    lastComparison = Boolean.valueOf(isSetRevision()).compareTo(typedOther.isSetRevision());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetRevision()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.revision, typedOther.revision);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(isSetTimestamp()).compareTo(typedOther.isSetTimestamp());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetTimestamp()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.timestamp, typedOther.timestamp);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    return 0;
+  }
+
+  public _Fields fieldForId(int fieldId) {
+    return _Fields.findByThriftId(fieldId);
+  }
+
+  public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+    org.apache.thrift.protocol.TField field;
+    iprot.readStructBegin();
+    while (true)
+    {
+      field = iprot.readFieldBegin();
+      if (field.type == org.apache.thrift.protocol.TType.STOP) {
+        break;
+      }
+      switch (field.id) {
+        case 1: // REVISION
+          if (field.type == org.apache.thrift.protocol.TType.I64) {
+            this.revision = iprot.readI64();
+            setRevisionIsSet(true);
+          } else {
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        case 2: // TIMESTAMP
+          if (field.type == org.apache.thrift.protocol.TType.I64) {
+            this.timestamp = iprot.readI64();
+            setTimestampIsSet(true);
+          } else {
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        default:
+          org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+      }
+      iprot.readFieldEnd();
+    }
+    iprot.readStructEnd();
+
+    // check for required fields of primitive type, which can't be checked in the validate method
+    validate();
+  }
+
+  public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+    validate();
+
+    oprot.writeStructBegin(STRUCT_DESC);
+    oprot.writeFieldBegin(REVISION_FIELD_DESC);
+    oprot.writeI64(this.revision);
+    oprot.writeFieldEnd();
+    oprot.writeFieldBegin(TIMESTAMP_FIELD_DESC);
+    oprot.writeI64(this.timestamp);
+    oprot.writeFieldEnd();
+    oprot.writeFieldStop();
+    oprot.writeStructEnd();
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("StoreFamilyRevision(");
+    boolean first = true;
+
+    sb.append("revision:");
+    sb.append(this.revision);
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("timestamp:");
+    sb.append(this.timestamp);
+    first = false;
+    sb.append(")");
+    return sb.toString();
+  }
+
+  public void validate() throws org.apache.thrift.TException {
+    // check for required fields
+  }
+
+  private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+    try {
+      write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+    try {
+      // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+      __isset_bit_vector = new BitSet(1);
+      read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+}
+

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/gen-java/org/apache/hcatalog/hbase/snapshot/transaction/thrift/StoreFamilyRevisionList.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/gen-java/org/apache/hcatalog/hbase/snapshot/transaction/thrift/StoreFamilyRevisionList.java?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/gen-java/org/apache/hcatalog/hbase/snapshot/transaction/thrift/StoreFamilyRevisionList.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/gen-java/org/apache/hcatalog/hbase/snapshot/transaction/thrift/StoreFamilyRevisionList.java Wed Dec 21 07:43:04 2011
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This class is used to store a list of StoreFamilyRevision for a column
+ * family in zookeeper.
+ *
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.7.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ */
+package org.apache.hcatalog.hbase.snapshot.transaction.thrift;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class StoreFamilyRevisionList implements org.apache.thrift.TBase<StoreFamilyRevisionList, StoreFamilyRevisionList._Fields>, java.io.Serializable, Cloneable {
+  private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("StoreFamilyRevisionList");
+
+  private static final org.apache.thrift.protocol.TField REVISION_LIST_FIELD_DESC = new org.apache.thrift.protocol.TField("revisionList", org.apache.thrift.protocol.TType.LIST, (short)1);
+
+  public List<StoreFamilyRevision> revisionList; // required
+
+  /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+  public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+    REVISION_LIST((short)1, "revisionList");
+
+    private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+    static {
+      for (_Fields field : EnumSet.allOf(_Fields.class)) {
+        byName.put(field.getFieldName(), field);
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, or null if its not found.
+     */
+    public static _Fields findByThriftId(int fieldId) {
+      switch(fieldId) {
+        case 1: // REVISION_LIST
+          return REVISION_LIST;
+        default:
+          return null;
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, throwing an exception
+     * if it is not found.
+     */
+    public static _Fields findByThriftIdOrThrow(int fieldId) {
+      _Fields fields = findByThriftId(fieldId);
+      if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+      return fields;
+    }
+
+    /**
+     * Find the _Fields constant that matches name, or null if its not found.
+     */
+    public static _Fields findByName(String name) {
+      return byName.get(name);
+    }
+
+    private final short _thriftId;
+    private final String _fieldName;
+
+    _Fields(short thriftId, String fieldName) {
+      _thriftId = thriftId;
+      _fieldName = fieldName;
+    }
+
+    public short getThriftFieldId() {
+      return _thriftId;
+    }
+
+    public String getFieldName() {
+      return _fieldName;
+    }
+  }
+
+  // isset id assignments
+
+  public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+  static {
+    Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+    tmpMap.put(_Fields.REVISION_LIST, new org.apache.thrift.meta_data.FieldMetaData("revisionList", org.apache.thrift.TFieldRequirementType.DEFAULT,
+        new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST,
+            new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, StoreFamilyRevision.class))));
+    metaDataMap = Collections.unmodifiableMap(tmpMap);
+    org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(StoreFamilyRevisionList.class, metaDataMap);
+  }
+
+  public StoreFamilyRevisionList() {
+  }
+
+  public StoreFamilyRevisionList(
+    List<StoreFamilyRevision> revisionList)
+  {
+    this();
+    this.revisionList = revisionList;
+  }
+
+  /**
+   * Performs a deep copy on <i>other</i>.
+   */
+  public StoreFamilyRevisionList(StoreFamilyRevisionList other) {
+    if (other.isSetRevisionList()) {
+      List<StoreFamilyRevision> __this__revisionList = new ArrayList<StoreFamilyRevision>();
+      for (StoreFamilyRevision other_element : other.revisionList) {
+        __this__revisionList.add(new StoreFamilyRevision(other_element));
+      }
+      this.revisionList = __this__revisionList;
+    }
+  }
+
+  public StoreFamilyRevisionList deepCopy() {
+    return new StoreFamilyRevisionList(this);
+  }
+
+  @Override
+  public void clear() {
+    this.revisionList = null;
+  }
+
+  public int getRevisionListSize() {
+    return (this.revisionList == null) ? 0 : this.revisionList.size();
+  }
+
+  public java.util.Iterator<StoreFamilyRevision> getRevisionListIterator() {
+    return (this.revisionList == null) ? null : this.revisionList.iterator();
+  }
+
+  public void addToRevisionList(StoreFamilyRevision elem) {
+    if (this.revisionList == null) {
+      this.revisionList = new ArrayList<StoreFamilyRevision>();
+    }
+    this.revisionList.add(elem);
+  }
+
+  public List<StoreFamilyRevision> getRevisionList() {
+    return this.revisionList;
+  }
+
+  public StoreFamilyRevisionList setRevisionList(List<StoreFamilyRevision> revisionList) {
+    this.revisionList = revisionList;
+    return this;
+  }
+
+  public void unsetRevisionList() {
+    this.revisionList = null;
+  }
+
+  /** Returns true if field revisionList is set (has been assigned a value) and false otherwise */
+  public boolean isSetRevisionList() {
+    return this.revisionList != null;
+  }
+
+  public void setRevisionListIsSet(boolean value) {
+    if (!value) {
+      this.revisionList = null;
+    }
+  }
+
+  public void setFieldValue(_Fields field, Object value) {
+    switch (field) {
+    case REVISION_LIST:
+      if (value == null) {
+        unsetRevisionList();
+      } else {
+        setRevisionList((List<StoreFamilyRevision>)value);
+      }
+      break;
+
+    }
+  }
+
+  public Object getFieldValue(_Fields field) {
+    switch (field) {
+    case REVISION_LIST:
+      return getRevisionList();
+
+    }
+    throw new IllegalStateException();
+  }
+
+  /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+  public boolean isSet(_Fields field) {
+    if (field == null) {
+      throw new IllegalArgumentException();
+    }
+
+    switch (field) {
+    case REVISION_LIST:
+      return isSetRevisionList();
+    }
+    throw new IllegalStateException();
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that == null)
+      return false;
+    if (that instanceof StoreFamilyRevisionList)
+      return this.equals((StoreFamilyRevisionList)that);
+    return false;
+  }
+
+  public boolean equals(StoreFamilyRevisionList that) {
+    if (that == null)
+      return false;
+
+    boolean this_present_revisionList = true && this.isSetRevisionList();
+    boolean that_present_revisionList = true && that.isSetRevisionList();
+    if (this_present_revisionList || that_present_revisionList) {
+      if (!(this_present_revisionList && that_present_revisionList))
+        return false;
+      if (!this.revisionList.equals(that.revisionList))
+        return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    return 0;
+  }
+
+  public int compareTo(StoreFamilyRevisionList other) {
+    if (!getClass().equals(other.getClass())) {
+      return getClass().getName().compareTo(other.getClass().getName());
+    }
+
+    int lastComparison = 0;
+    StoreFamilyRevisionList typedOther = (StoreFamilyRevisionList)other;
+
+    lastComparison = Boolean.valueOf(isSetRevisionList()).compareTo(typedOther.isSetRevisionList());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetRevisionList()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.revisionList, typedOther.revisionList);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    return 0;
+  }
+
+  public _Fields fieldForId(int fieldId) {
+    return _Fields.findByThriftId(fieldId);
+  }
+
+  public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+    org.apache.thrift.protocol.TField field;
+    iprot.readStructBegin();
+    while (true)
+    {
+      field = iprot.readFieldBegin();
+      if (field.type == org.apache.thrift.protocol.TType.STOP) {
+        break;
+      }
+      switch (field.id) {
+        case 1: // REVISION_LIST
+          if (field.type == org.apache.thrift.protocol.TType.LIST) {
+            {
+              org.apache.thrift.protocol.TList _list0 = iprot.readListBegin();
+              this.revisionList = new ArrayList<StoreFamilyRevision>(_list0.size);
+              for (int _i1 = 0; _i1 < _list0.size; ++_i1)
+              {
+                StoreFamilyRevision _elem2; // required
+                _elem2 = new StoreFamilyRevision();
+                _elem2.read(iprot);
+                this.revisionList.add(_elem2);
+              }
+              iprot.readListEnd();
+            }
+          } else {
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        default:
+          org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+      }
+      iprot.readFieldEnd();
+    }
+    iprot.readStructEnd();
+
+    // check for required fields of primitive type, which can't be checked in the validate method
+    validate();
+  }
+
+  public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+    validate();
+
+    oprot.writeStructBegin(STRUCT_DESC);
+    if (this.revisionList != null) {
+      oprot.writeFieldBegin(REVISION_LIST_FIELD_DESC);
+      {
+        oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, this.revisionList.size()));
+        for (StoreFamilyRevision _iter3 : this.revisionList)
+        {
+          _iter3.write(oprot);
+        }
+        oprot.writeListEnd();
+      }
+      oprot.writeFieldEnd();
+    }
+    oprot.writeFieldStop();
+    oprot.writeStructEnd();
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("StoreFamilyRevisionList(");
+    boolean first = true;
+
+    sb.append("revisionList:");
+    if (this.revisionList == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.revisionList);
+    }
+    first = false;
+    sb.append(")");
+    return sb.toString();
+  }
+
+  public void validate() throws org.apache.thrift.TException {
+    // check for required fields
+  }
+
+  private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+    try {
+      write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+    try {
+      read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+}
+

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/FamilyRevision.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/FamilyRevision.java?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/FamilyRevision.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/FamilyRevision.java Wed Dec 21 07:43:04 2011
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+
+
+/**
+ * A FamiliyRevision class consists of a revision number and a expiration
+ * timestamp. When a write transaction starts, the transaction
+ * object is appended to the transaction list of the each column
+ * family and stored in the corresponding znode. When a write transaction is
+ * committed, the transaction object is removed from the list.
+ */
+class FamilyRevision implements
+        Comparable<FamilyRevision> {
+
+    private long revision;
+
+    private long timestamp;
+
+    /**
+     * Create a FamilyRevision object
+     * @param rev revision number
+     * @param ts expiration timestamp
+     */
+    FamilyRevision(long rev, long ts) {
+        this.revision = rev;
+        this.timestamp = ts;
+    }
+
+    long getRevision() {
+        return revision;
+    }
+
+    long getExpireTimestamp() {
+        return timestamp;
+    }
+
+    void setExpireTimestamp(long ts) {
+        timestamp = ts;
+    }
+
+    @Override
+    public String toString() {
+        String description = "revision: " + revision + " ts: " + timestamp;
+        return description;
+    }
+
+    @Override
+    public int compareTo(FamilyRevision o) {
+        long d = revision - o.getRevision();
+        return (d < 0) ? -1 : (d > 0) ? 1 : 0;
+    }
+
+
+}

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/IDGenerator.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/IDGenerator.java?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/IDGenerator.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/IDGenerator.java Wed Dec 21 07:43:04 2011
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+import java.io.IOException;
+import java.nio.charset.Charset;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hcatalog.hbase.snapshot.lock.LockListener;
+import org.apache.hcatalog.hbase.snapshot.lock.WriteLock;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+
+/**
+ * This class generates revision id's for transactions.
+ */
+class IDGenerator implements LockListener{
+
+    private ZooKeeper zookeeper;
+    private String zNodeDataLoc;
+    private String zNodeLockBasePath;
+    private long id;
+    private static final Log LOG = LogFactory.getLog(IDGenerator.class);
+
+    IDGenerator(ZooKeeper zookeeper, String tableName, String idGenNode)
+            throws IOException {
+        this.zookeeper = zookeeper;
+        this.zNodeDataLoc = idGenNode;
+        this.zNodeLockBasePath = PathUtil.getLockManagementNode(idGenNode);
+    }
+
+    /**
+     * This method obtains a revision id for a transaction.
+     *
+     * @return revision ID
+     * @throws IOException
+     */
+    public long obtainID() throws IOException{
+        WriteLock wLock = new WriteLock(zookeeper, zNodeLockBasePath, Ids.OPEN_ACL_UNSAFE);
+        wLock.setLockListener(this);
+        try {
+            boolean lockGrabbed = wLock.lock();
+            if (lockGrabbed == false) {
+                //TO DO : Let this request queue up and try obtaining lock.
+                throw new IOException("Unable to obtain lock to obtain id.");
+            } else {
+                    id = incrementAndReadCounter();
+            }
+        } catch (KeeperException e) {
+            LOG.warn("Exception while obtaining lock for ID.", e);
+            throw new IOException("Exception while obtaining lock for ID.", e);
+        } catch (InterruptedException e) {
+            LOG.warn("Exception while obtaining lock for ID.", e);
+            throw new IOException("Exception while obtaining lock for ID.", e);
+        } finally {
+            wLock.unlock();
+        }
+        return id;
+    }
+
+    /**
+     * This method reads the latest revision ID that has been used. The ID
+     * returned by this method cannot be used for transaction.
+     * @return revision ID
+     * @throws IOException
+     */
+    public long readID() throws IOException{
+        long curId;
+        try {
+            Stat stat = new Stat();
+            byte[] data = zookeeper.getData(this.zNodeDataLoc, false, stat);
+            curId = Long.parseLong(new String(data,Charset.forName("UTF-8")));
+        } catch (KeeperException e) {
+            LOG.warn("Exception while reading current revision id.", e);
+            throw new IOException("Exception while reading current revision id.", e);
+        } catch (InterruptedException e) {
+            LOG.warn("Exception while reading current revision id.", e);
+            throw new IOException("Exception while reading current revision id.",e);
+        }
+
+        return curId;
+    }
+
+
+    private long incrementAndReadCounter() throws IOException{
+
+        long curId, usedId;
+        try {
+            Stat stat = new Stat();
+            byte[] data = zookeeper.getData(this.zNodeDataLoc, false, stat);
+            usedId = Long.parseLong((new String(data,Charset.forName("UTF-8"))));
+            curId = usedId +1;
+            String lastUsedID = String.valueOf(curId);
+            zookeeper.setData(this.zNodeDataLoc, lastUsedID.getBytes(Charset.forName("UTF-8")), -1 );
+
+        } catch (KeeperException e) {
+            LOG.warn("Exception while incrementing revision id.", e);
+            throw new IOException("Exception while incrementing revision id. ", e);
+        } catch (InterruptedException e) {
+            LOG.warn("Exception while incrementing revision id.", e);
+            throw new IOException("Exception while incrementing revision id. ", e);
+        }
+
+        return curId;
+    }
+
+    /*
+     * @see org.apache.hcatalog.hbase.snapshot.lock.LockListener#lockAcquired()
+     */
+    @Override
+    public void lockAcquired() {
+
+
+    }
+
+    /*
+     * @see org.apache.hcatalog.hbase.snapshot.lock.LockListener#lockReleased()
+     */
+    @Override
+    public void lockReleased() {
+
+    }
+
+
+}

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/PathUtil.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/PathUtil.java?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/PathUtil.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/PathUtil.java Wed Dec 21 07:43:04 2011
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+
+
+/**
+ * The PathUtil class is a utility class to provide information about various
+ * znode paths. The following is the znode structure used for storing information.
+ * baseDir/ClockNode
+ * baseDir/TrasactionBasePath
+ * baseDir/TrasactionBasePath/TableA/revisionID
+ * baseDir/TrasactionBasePath/TableA/columnFamily-1
+ * baseDir/TrasactionBasePath/TableA/columnFamily-1/runnningTxns
+ * baseDir/TrasactionBasePath/TableA/columnFamily-1/abortedTxns
+ * baseDir/TrasactionBasePath/TableB/revisionID
+ * baseDir/TrasactionBasePath/TableB/columnFamily-1
+ * baseDir/TrasactionBasePath/TableB/columnFamily-1/runnningTxns
+ * baseDir/TrasactionBasePath/TableB/columnFamily-1/abortedTxns
+
+ */
+public class PathUtil{
+
+    static final String      DATA_DIR = "/data";
+    static final String      CLOCK_NODE   = "/clock";
+
+    /**
+     * This method returns the data path associated with the currently
+     * running transactions of a given table and column/column family.
+     * @param baseDir
+     * @param tableName
+     * @param columnFamily
+     * @return The path of the running transactions data.
+     */
+    static String getRunningTxnInfoPath(String baseDir, String tableName,
+            String columnFamily) {
+        String txnBasePath = getTransactionBasePath(baseDir);
+        String path = txnBasePath + "/" + tableName + "/" + columnFamily
+                + "/runningTxns";
+        return path;
+    }
+
+    /**
+     * This method returns the data path associated with the aborted
+     * transactions of a given table and column/column family.
+     * @param baseDir The base directory for revision management.
+     * @param tableName The name of the table.
+     * @param columnFamily
+     * @return The path of the aborted transactions data.
+     */
+    static String getAbortInformationPath(String baseDir, String tableName,
+            String columnFamily) {
+        String txnBasePath = getTransactionBasePath(baseDir);
+        String path = txnBasePath + "/" + tableName + "/" + columnFamily
+                + "/abortData";
+        return path;
+    }
+
+    /**
+     * Gets the revision id node for a given table.
+     *
+     * @param baseDir the base dir for revision management.
+     * @param tableName the table name
+     * @return the revision id node path.
+     */
+    static String getRevisionIDNode(String baseDir, String tableName) {
+        String rmBasePath = getTransactionBasePath(baseDir);
+        String revisionIDNode = rmBasePath + "/" + tableName + "/idgen";
+        return revisionIDNode;
+    }
+
+   /**
+    * Gets the lock management node for any znode that needs to be locked.
+    *
+    * @param path the path of the znode.
+    * @return the lock management node path.
+    */
+   static String getLockManagementNode(String path) {
+        String lockNode = path + "_locknode_";
+        return lockNode;
+    }
+
+    /**
+     * This method returns the base path for the transaction data.
+     *
+     * @param baseDir The base dir for revision management.
+     * @return The base path for the transaction data.
+     */
+    static String getTransactionBasePath(String baseDir) {
+        String txnBaseNode = baseDir + DATA_DIR;
+        return txnBaseNode;
+    }
+
+    /**
+     * Gets the txn data path for a given table.
+     *
+     * @param baseDir the base dir for revision management.
+     * @param tableName the table name
+     * @return the txn data path for the table.
+     */
+    static String getTxnDataPath(String baseDir, String tableName){
+        String txnBasePath = getTransactionBasePath(baseDir);
+        String path = txnBasePath + "/" + tableName;
+        return path;
+    }
+
+    /**
+     * This method returns the data path for clock node.
+     *
+     * @param baseDir
+     * @return The data path for clock.
+     */
+    static String getClockPath(String baseDir) {
+        String clockNode = baseDir + CLOCK_NODE;
+        return clockNode;
+    }
+}

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java Wed Dec 21 07:43:04 2011
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * This interface provides APIs for implementing revision management.
+ */
+public interface RevisionManager {
+
+    public static final String REVISION_MGR_IMPL_CLASS = "revision.manager.impl.class";
+
+    /**
+     * Initialize the revision manager.
+     */
+    public void initialize(Properties properties);
+
+    /**
+     * Opens the revision manager.
+     *
+     * @throws IOException
+     */
+    public void open() throws IOException;
+
+    /**
+     * Closes the revision manager.
+     *
+     * @throws IOException
+     */
+    public void close() throws IOException;
+
+    /**
+     * Start the write transaction.
+     *
+     * @param table
+     * @param families
+     * @return
+     * @throws IOException
+     */
+    public Transaction beginWriteTransaction(String table, List<String> families)
+            throws IOException;
+
+    /**
+     * Start the write transaction.
+     *
+     * @param table
+     * @param families
+     * @param keepAlive
+     * @return
+     * @throws IOException
+     */
+    public Transaction beginWriteTransaction(String table,
+            List<String> families, long keepAlive) throws IOException;
+
+    /**
+     * Commit the write transaction.
+     *
+     * @param transaction
+     * @throws IOException
+     */
+    public void commitWriteTransaction(Transaction transaction)
+            throws IOException;
+
+    /**
+     * Abort the write transaction.
+     *
+     * @param transaction
+     * @throws IOException
+     */
+    public void abortWriteTransaction(Transaction transaction)
+            throws IOException;
+
+    /**
+     * Create the latest snapshot of the table.
+     *
+     * @param tableName
+     * @return
+     * @throws IOException
+     */
+    public TableSnapshot createSnapshot(String tableName) throws IOException;
+
+    /**
+     * Create the snapshot of the table using the revision number.
+     *
+     * @param tableName
+     * @param revision
+     * @return
+     * @throws IOException
+     */
+    public TableSnapshot createSnapshot(String tableName, long revision)
+            throws IOException;
+
+    /**
+     * Extends the expiration of a transaction by the time indicated by keep alive.
+     *
+     * @param transaction
+     * @throws IOException
+     */
+    public void keepAlive(Transaction transaction) throws IOException;
+
+}

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java Wed Dec 21 07:43:04 2011
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+
+import java.io.IOException;
+import java.util.Properties;
+
+public class RevisionManagerFactory {
+
+   /**
+    * Gets an instance of revision manager.
+    *
+    * @param properties The properties required to created the revision manager.
+    * @return the revision manager An instance of revision manager.
+    * @throws IOException Signals that an I/O exception has occurred.
+    */
+   public static RevisionManager getRevisionManager(Properties properties) throws IOException{
+
+        RevisionManager revisionMgr;
+        ClassLoader classLoader = Thread.currentThread()
+                .getContextClassLoader();
+        if (classLoader == null) {
+            classLoader = RevisionManagerFactory.class.getClassLoader();
+        }
+        String className = properties.getProperty(
+                RevisionManager.REVISION_MGR_IMPL_CLASS,
+                ZKBasedRevisionManager.class.getName());
+        try {
+
+            @SuppressWarnings("unchecked")
+            Class<? extends RevisionManager> revisionMgrClass = (Class<? extends RevisionManager>) Class
+                    .forName(className, true , classLoader);
+            revisionMgr = (RevisionManager) revisionMgrClass.newInstance();
+            revisionMgr.initialize(properties);
+        } catch (ClassNotFoundException e) {
+            throw new IOException(
+                    "The implementation class of revision manager not found.",
+                    e);
+        } catch (InstantiationException e) {
+            throw new IOException(
+                    "Exception encountered during instantiating revision manager implementation.",
+                    e);
+        } catch (IllegalAccessException e) {
+            throw new IOException(
+                    "IllegalAccessException encountered during instantiating revision manager implementation.",
+                    e);
+        } catch (IllegalArgumentException e) {
+            throw new IOException(
+                    "IllegalArgumentException encountered during instantiating revision manager implementation.",
+                    e);
+        }
+        return revisionMgr;
+    }
+
+}

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java Wed Dec 21 07:43:04 2011
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The snapshot for a table and a list of column families.
+ */
+public class TableSnapshot {
+
+    private String name;
+
+    private Map<String, Long> cfRevisionMap;
+
+
+    public TableSnapshot(String name, Map<String, Long> cfRevMap) {
+        this.name = name;
+        this.cfRevisionMap = cfRevMap;
+    }
+
+    /**
+     * Gets the table name.
+     *
+     * @return String The name of the table.
+     */
+    public String getTableName() {
+        return name;
+    }
+
+    /**
+     * Gets the column families.
+     *
+     * @return List<String> A list of column families associated with the snapshot.
+     */
+    public List<String> getColumnFamilies(){
+        return  new ArrayList<String>(this.cfRevisionMap.keySet());
+    }
+
+    /**
+     * Gets the revision.
+     *
+     * @param familyName The name of the column family.
+     * @return the revision
+     */
+    public long getRevision(String familyName){
+        return this.cfRevisionMap.get(familyName);
+    }
+
+    @Override
+    public String toString() {
+        String snapshot = "Table Name : " + name
+                + " Column Familiy revision : " + cfRevisionMap.toString();
+        return snapshot;
+    }
+}

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/Transaction.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/Transaction.java?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/Transaction.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/Transaction.java Wed Dec 21 07:43:04 2011
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This class is responsible for storing information related to
+ * transactions.
+ */
+public class Transaction implements Serializable {
+
+    private String tableName;
+    private List<String> columnFamilies = new ArrayList<String>();
+    private long timeStamp;
+    private long keepAlive;
+    private long revision;
+
+
+    Transaction(String tableName, List<String> columnFamilies, long revision, long timestamp) {
+        this.tableName = tableName;
+        this.columnFamilies = columnFamilies;
+        this.timeStamp = timestamp;
+        this.revision = revision;
+    }
+
+   /**
+     * @return The revision number associated with a transaction.
+     */
+   public long getRevisionNumber(){
+       return this.revision;
+   }
+
+    /**
+     * @return The table name associated with a transaction.
+     */
+   public String getTableName() {
+        return tableName;
+    }
+
+    /**
+     * @return The column families associated with a transaction.
+     */
+   public List<String> getColumnFamilies() {
+        return columnFamilies;
+    }
+
+    /**
+     * @return The expire timestamp associated with a transaction.
+     */
+   long getTransactionExpireTimeStamp(){
+        return this.timeStamp + this.keepAlive;
+    }
+
+    void setKeepAlive(long seconds){
+        this.keepAlive = seconds;
+    }
+
+    /**
+     * Gets the keep alive value.
+     *
+     * @return long  The keep alive value for the transaction.
+     */
+    public long getKeepAliveValue(){
+        return this.keepAlive;
+    }
+
+    /**
+     * Gets the family revision info.
+     *
+     * @return FamilyRevision An instance of FamilyRevision associated with the transaction.
+     */
+    FamilyRevision getFamilyRevisionInfo(){
+        return new FamilyRevision(revision, getTransactionExpireTimeStamp());
+    }
+
+   /**
+    * Keep alive transaction. This methods extends the expire timestamp of a
+    * transaction by the "keep alive" amount.
+    */
+   void keepAliveTransaction(){
+        this.timeStamp = this.timeStamp + this.keepAlive;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("Revision : ");
+        sb.append(this.getRevisionNumber());
+        sb.append(" Timestamp : ");
+        sb.append(this.getTransactionExpireTimeStamp());
+        sb.append("\n").append("Table : ");
+        sb.append(this.tableName).append("\n");
+        sb.append("Column Families : ");
+        sb.append(this.columnFamilies.toString());
+        return sb.toString();
+    }
+}

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java Wed Dec 21 07:43:04 2011
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hcatalog.hbase.snapshot.lock.LockListener;
+import org.apache.hcatalog.hbase.snapshot.lock.WriteLock;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+
+/**
+ * The service for providing revision management to Hbase tables.
+ */
+public class ZKBasedRevisionManager implements RevisionManager{
+
+    public static final String HOSTLIST = "revision.manager.zk.HostList";
+    public static final String DATADIR = "revision.manager.zk.DataDir";
+    private static  int DEFAULT_WRITE_TRANSACTION_TIMEOUT = 14400000;
+    private static final Log LOG = LogFactory.getLog(ZKBasedRevisionManager.class);
+    private String zkHostList;
+    private String baseDir;
+    private ZKUtil zkUtil;
+
+
+    /*
+     * @see org.apache.hcatalog.hbase.snapshot.RevisionManager#initialize()
+     */
+    @Override
+    public void initialize(Properties properties) {
+        this.zkHostList = properties.getProperty(ZKBasedRevisionManager.HOSTLIST, "localhost:2181");
+        this.baseDir = properties.getProperty(ZKBasedRevisionManager.DATADIR,"/revision-management");
+    }
+
+    /**
+     * Open a ZooKeeper connection
+     * @throws java.io.IOException
+     */
+
+    public void open() throws IOException {
+        zkUtil = new ZKUtil(zkHostList, this.baseDir);
+        zkUtil.createRootZNodes();
+        LOG.info("Created root znodes for revision manager.");
+    }
+
+    /**
+     * Close Zookeeper connection
+     */
+    public void close() {
+        zkUtil.closeZKConnection();
+    }
+
+    private void checkInputParams(String table, List<String> families) {
+        if (table == null) {
+            throw new IllegalArgumentException(
+                    "The table name must be specified for reading.");
+        }
+        if (families == null || families.isEmpty()) {
+            throw new IllegalArgumentException(
+                    "At least one column family should be specified for reading.");
+        }
+    }
+
+
+    /* @param table
+    /* @param families
+    /* @param keepAlive
+    /* @return
+    /* @throws IOException
+     * @see org.apache.hcatalog.hbase.snapshot.RevisionManager#beginWriteTransaction(java.lang.String, java.util.List, long)
+     */
+    public Transaction beginWriteTransaction(String table,
+            List<String> families, long keepAlive) throws IOException {
+
+        checkInputParams(table, families);
+        zkUtil.setUpZnodesForTable(table, families);
+        long nextId = zkUtil.nextId(table);
+        long expireTimestamp = zkUtil.getTimeStamp();
+        Transaction transaction = new Transaction(table, families, nextId,
+                expireTimestamp);
+        if (keepAlive != -1) {
+            transaction.setKeepAlive(keepAlive);
+        } else {
+            transaction.setKeepAlive(DEFAULT_WRITE_TRANSACTION_TIMEOUT);
+        }
+
+        refreshTransactionList(transaction.getTableName());
+        String lockPath = prepareLockNode(table);
+        WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath,
+                Ids.OPEN_ACL_UNSAFE);
+        RMLockListener myLockListener = new RMLockListener();
+        wLock.setLockListener(myLockListener);
+        try {
+            boolean lockGrabbed = wLock.lock();
+            if (lockGrabbed == false) {
+              //TO DO : Let this request queue up and try obtaining lock.
+                throw new IOException(
+                        "Unable to obtain lock while beginning transaction. "
+                                + transaction.toString());
+            } else {
+                List<String> colFamilies = transaction.getColumnFamilies();
+                FamilyRevision revisionData = transaction.getFamilyRevisionInfo();
+                for (String cfamily : colFamilies) {
+                    String path = PathUtil.getRunningTxnInfoPath(
+                            baseDir, table, cfamily);
+                    zkUtil.updateData(path, revisionData,
+                            ZKUtil.UpdateMode.APPEND);
+                }
+            }
+        } catch (KeeperException e) {
+            throw new IOException("Exception while obtaining lock.", e);
+        } catch (InterruptedException e) {
+            throw new IOException("Exception while obtaining lock.", e);
+        }
+        finally {
+            wLock.unlock();
+        }
+
+        return transaction;
+    }
+
+    /* @param table The table name.
+    /* @param families The column families involved in the transaction.
+    /* @return transaction The transaction which was started.
+    /* @throws IOException
+     * @see org.apache.hcatalog.hbase.snapshot.RevisionManager#beginWriteTransaction(java.lang.String, java.util.List)
+     */
+    public Transaction beginWriteTransaction(String table, List<String> families)
+            throws IOException {
+        return beginWriteTransaction(table, families, -1);
+    }
+
+    /**
+     * This method commits a write transaction.
+     * @param transaction The revision information associated with transaction.
+     * @throws java.io.IOException
+     */
+    public void commitWriteTransaction(Transaction transaction) throws IOException {
+        refreshTransactionList(transaction.getTableName());
+
+        String lockPath = prepareLockNode(transaction.getTableName());
+        WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath,
+                Ids.OPEN_ACL_UNSAFE);
+        RMLockListener myLockListener = new RMLockListener();
+        wLock.setLockListener(myLockListener);
+        try {
+            boolean lockGrabbed = wLock.lock();
+            if (lockGrabbed == false) {
+              //TO DO : Let this request queue up and try obtaining lock.
+                throw new IOException(
+                        "Unable to obtain lock while commiting transaction. "
+                                + transaction.toString());
+            } else {
+                String tableName = transaction.getTableName();
+                List<String> colFamilies = transaction.getColumnFamilies();
+                FamilyRevision revisionData = transaction.getFamilyRevisionInfo();
+                for (String cfamily : colFamilies) {
+                    String path = PathUtil.getRunningTxnInfoPath(
+                            baseDir, tableName, cfamily);
+                    zkUtil.updateData(path, revisionData,
+                            ZKUtil.UpdateMode.REMOVE);
+                }
+
+            }
+        } catch (KeeperException e) {
+            throw new IOException("Exception while obtaining lock.", e);
+        } catch (InterruptedException e) {
+            throw new IOException("Exception while obtaining lock.", e);
+        }
+        finally {
+            wLock.unlock();
+        }
+        LOG.info("Write Transaction committed: " + transaction.toString());
+    }
+
+    /**
+     * This method aborts a write transaction.
+     * @param state the state associated with the Transaction
+     * @throws java.io.IOException
+     */
+    public void abortWriteTransaction(Transaction transaction) throws IOException {
+
+        refreshTransactionList(transaction.getTableName());
+        String lockPath = prepareLockNode(transaction.getTableName());
+        WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath,
+                Ids.OPEN_ACL_UNSAFE);
+        RMLockListener myLockListener = new RMLockListener();
+        wLock.setLockListener(myLockListener);
+        try {
+            boolean lockGrabbed = wLock.lock();
+            if (lockGrabbed == false) {
+              //TO DO : Let this request queue up and try obtaining lock.
+                throw new IOException(
+                        "Unable to obtain lock while aborting transaction. "
+                                + transaction.toString());
+            } else {
+                String tableName = transaction.getTableName();
+                List<String> colFamilies = transaction.getColumnFamilies();
+                FamilyRevision revisionData = transaction
+                        .getFamilyRevisionInfo();
+                for (String cfamily : colFamilies) {
+                    String path = PathUtil.getRunningTxnInfoPath(
+                            baseDir, tableName, cfamily);
+                    zkUtil.updateData(path, revisionData,
+                            ZKUtil.UpdateMode.REMOVE);
+                    path = PathUtil.getAbortInformationPath(baseDir,
+                            tableName, cfamily);
+                    zkUtil.updateData(path, revisionData,
+                            ZKUtil.UpdateMode.APPEND);
+                }
+
+            }
+        } catch (KeeperException e) {
+            throw new IOException("Exception while obtaining lock.", e);
+        } catch (InterruptedException e) {
+            throw new IOException("Exception while obtaining lock.", e);
+        }
+        finally {
+            wLock.unlock();
+        }
+        LOG.info("Write Transaction aborted: " + transaction.toString());
+    }
+
+
+     /* @param transaction
+     /* @throws IOException
+      * @see org.apache.hcatalog.hbase.snapshot.RevsionManager#keepAlive(org.apache.hcatalog.hbase.snapshot.Transaction)
+      */
+     public void keepAlive(Transaction transaction)
+            throws IOException {
+
+         refreshTransactionList(transaction.getTableName());
+         transaction.keepAliveTransaction();
+         String lockPath = prepareLockNode(transaction.getTableName());
+         WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath,
+                 Ids.OPEN_ACL_UNSAFE);
+         RMLockListener myLockListener = new RMLockListener();
+         wLock.setLockListener(myLockListener);
+         try {
+             boolean lockGrabbed = wLock.lock();
+             if (lockGrabbed == false) {
+               //TO DO : Let this request queue up and try obtaining lock.
+                 throw new IOException(
+                         "Unable to obtain lock for keep alive of transaction. "
+                                 + transaction.toString());
+             }else {
+                 String tableName = transaction.getTableName();
+                 List<String> colFamilies = transaction.getColumnFamilies();
+                 FamilyRevision revisionData = transaction.getFamilyRevisionInfo();
+                 for (String cfamily : colFamilies) {
+                     String path = PathUtil.getRunningTxnInfoPath(
+                             baseDir, tableName, cfamily);
+                     zkUtil.updateData(path, revisionData,
+                             ZKUtil.UpdateMode.KEEP_ALIVE);
+                 }
+
+             }
+         } catch (KeeperException e) {
+             throw new IOException("Exception while obtaining lock.", e);
+         } catch (InterruptedException e) {
+             throw new IOException("Exception while obtaining lock.", e);
+         }finally {
+             wLock.unlock();
+         }
+
+    }
+
+    /* This method allows the user to create latest snapshot of a
+    /* table.
+    /* @param tableName The table whose snapshot is being created.
+    /* @return TableSnapshot An instance of TableSnaphot
+    /* @throws IOException
+     * @see org.apache.hcatalog.hbase.snapshot.RevsionManager#createSnapshot(java.lang.String)
+     */
+    public TableSnapshot createSnapshot(String tableName) throws IOException{
+        refreshTransactionList(tableName);
+        long latestID = zkUtil.currentID(tableName);
+        HashMap<String, Long> cfMap = new HashMap<String, Long>();
+        List<String> columnFamilyNames = zkUtil.getColumnFamiliesOfTable(tableName);
+
+        for(String cfName: columnFamilyNames){
+            String cfPath = PathUtil.getRunningTxnInfoPath(baseDir, tableName, cfName);
+            List<FamilyRevision> tranxList = zkUtil.getTransactionList(cfPath);
+            long version;
+            if (!tranxList.isEmpty()) {
+                Collections.sort(tranxList);
+                // get the smallest running Transaction ID
+                long runningVersion = tranxList.get(0).getRevision();
+                version = runningVersion -1;
+            } else {
+                version = latestID;
+            }
+            cfMap.put(cfName, version);
+        }
+
+        return new TableSnapshot(tableName, cfMap);
+    }
+
+    /* This method allows the user to create snapshot of a
+    /* table with a given revision number.
+    /* @param tableName
+    /* @param revision
+    /* @return TableSnapshot
+    /* @throws IOException
+     * @see org.apache.hcatalog.hbase.snapshot.RevsionManager#createSnapshot(java.lang.String, long)
+     */
+    public TableSnapshot createSnapshot(String tableName, long revision) throws IOException{
+
+        long currentID = zkUtil.currentID(tableName);
+        if (revision > currentID) {
+            throw new IOException(
+                    "The revision specified in the snapshot is higher than the current revision of the table.");
+        }
+        refreshTransactionList(tableName);
+        HashMap<String, Long> cfMap = new HashMap<String, Long>();
+        List<String> columnFamilies = zkUtil.getColumnFamiliesOfTable(tableName);
+
+        for(String cf: columnFamilies){
+            cfMap.put(cf, revision);
+        }
+
+        return new TableSnapshot(tableName, cfMap);
+    }
+
+    /**
+     * Get the list of in-progress Transactions for a column family
+     * @param table the table name
+     * @param columnFamily the column family name
+     * @return a list of in-progress WriteTransactions
+     * @throws java.io.IOException
+     */
+    List<FamilyRevision> getRunningTransactions(String table,
+            String columnFamily) throws IOException {
+        String path = PathUtil.getRunningTxnInfoPath(baseDir, table,
+                columnFamily);
+        return zkUtil.getTransactionList(path);
+    }
+
+    /**
+     * Get the list of aborted Transactions for a column family
+     * @param table the table name
+     * @param columnFamily the column family name
+     * @return a list of aborted WriteTransactions
+     * @throws java.io.IOException
+     */
+     List<FamilyRevision> getAbortedWriteTransactions(String table,
+            String columnFamily) throws IOException {
+         String path = PathUtil.getAbortInformationPath(baseDir, table, columnFamily);
+         return zkUtil.getTransactionList(path);
+    }
+
+     private void refreshTransactionList(String tableName) throws IOException{
+        String lockPath = prepareLockNode(tableName);
+        WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath,
+                Ids.OPEN_ACL_UNSAFE);
+        RMLockListener myLockListener = new RMLockListener();
+        wLock.setLockListener(myLockListener);
+        try {
+            boolean lockGrabbed = wLock.lock();
+            if (lockGrabbed == false) {
+              //TO DO : Let this request queue up and try obtaining lock.
+                throw new IOException(
+                        "Unable to obtain lock while refreshing transactions of table "
+                                + tableName + ".");
+            }else {
+                List<String> cfPaths = zkUtil
+                        .getColumnFamiliesOfTable(tableName);
+                for (String cf : cfPaths) {
+                    String runningDataPath = PathUtil.getRunningTxnInfoPath(
+                            baseDir, tableName, cf);
+                    zkUtil.refreshTransactions(runningDataPath);
+        }
+
+            }
+        } catch (KeeperException e) {
+            throw new IOException("Exception while obtaining lock.", e);
+        } catch (InterruptedException e) {
+            throw new IOException("Exception while obtaining lock.", e);
+        } finally {
+            wLock.unlock();
+        }
+
+     }
+
+     private String prepareLockNode(String tableName) throws IOException{
+         String txnDataPath = PathUtil.getTxnDataPath(this.baseDir, tableName);
+         String lockPath = PathUtil.getLockManagementNode(txnDataPath);
+         zkUtil.ensurePathExists(lockPath, null, Ids.OPEN_ACL_UNSAFE,
+                 CreateMode.PERSISTENT);
+         return lockPath;
+     }
+
+
+    /*
+     * This class is a listener class for the locks used in revision management.
+     * TBD: Use the following class to signal that that the lock is actually
+     * been granted.
+     */
+     class RMLockListener implements LockListener {
+
+        /*
+         * @see org.apache.hcatalog.hbase.snapshot.lock.LockListener#lockAcquired()
+         */
+        @Override
+        public void lockAcquired() {
+
+        }
+
+        /*
+         * @see org.apache.hcatalog.hbase.snapshot.lock.LockListener#lockReleased()
+         */
+        @Override
+        public void lockReleased() {
+
+        }
+
+     }
+
+
+}