You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by to...@apache.org on 2011/12/21 07:43:05 UTC
svn commit: r1221635 [1/2] - in /incubator/hcatalog/trunk: ./
storage-drivers/hbase/ storage-drivers/hbase/if/ storage-drivers/hbase/ivy/
storage-drivers/hbase/src/gen-java/ storage-drivers/hbase/src/gen-java/org/
storage-drivers/hbase/src/gen-java/org...
Author: toffer
Date: Wed Dec 21 07:43:04 2011
New Revision: 1221635
URL: http://svn.apache.org/viewvc?rev=1221635&view=rev
Log:
HCATALOG-189 : Zookpeeper based revision manager for HBase Storage Driver (avandana via toffer)
Added:
incubator/hcatalog/trunk/storage-drivers/hbase/if/
incubator/hcatalog/trunk/storage-drivers/hbase/if/transaction.thrift
incubator/hcatalog/trunk/storage-drivers/hbase/src/gen-java/
incubator/hcatalog/trunk/storage-drivers/hbase/src/gen-java/org/
incubator/hcatalog/trunk/storage-drivers/hbase/src/gen-java/org/apache/
incubator/hcatalog/trunk/storage-drivers/hbase/src/gen-java/org/apache/hcatalog/
incubator/hcatalog/trunk/storage-drivers/hbase/src/gen-java/org/apache/hcatalog/hbase/
incubator/hcatalog/trunk/storage-drivers/hbase/src/gen-java/org/apache/hcatalog/hbase/snapshot/
incubator/hcatalog/trunk/storage-drivers/hbase/src/gen-java/org/apache/hcatalog/hbase/snapshot/transaction/
incubator/hcatalog/trunk/storage-drivers/hbase/src/gen-java/org/apache/hcatalog/hbase/snapshot/transaction/thrift/
incubator/hcatalog/trunk/storage-drivers/hbase/src/gen-java/org/apache/hcatalog/hbase/snapshot/transaction/thrift/StoreFamilyRevision.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/gen-java/org/apache/hcatalog/hbase/snapshot/transaction/thrift/StoreFamilyRevisionList.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/FamilyRevision.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/IDGenerator.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/PathUtil.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/Transaction.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKUtil.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/LockListener.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ProtocolSupport.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/WriteLock.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ZNodeName.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ZooKeeperOperation.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/IDGenClient.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestIDGenerator.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManager.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestThriftSerialization.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/lock/
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/lock/WriteLockTest.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/lock/ZNodeNameTest.java
Modified:
incubator/hcatalog/trunk/CHANGES.txt
incubator/hcatalog/trunk/storage-drivers/hbase/build.xml
incubator/hcatalog/trunk/storage-drivers/hbase/ivy.xml
incubator/hcatalog/trunk/storage-drivers/hbase/ivy/libraries.properties
Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1221635&r1=1221634&r2=1221635&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Wed Dec 21 07:43:04 2011
@@ -23,6 +23,8 @@ Trunk (unreleased changes)
INCOMPATIBLE CHANGES
NEW FEATURES
+ HCAT-189. Zookpeeper based revision manager for HBase Storage Driver (avandana via toffer)
+
HCAT-145. Add support for binary data type (hashutosh)
HCAT-151. Fixed native table names used for tables stored in non-default DBs in HBaseInputStorageDriver (avandana via toffer)
Modified: incubator/hcatalog/trunk/storage-drivers/hbase/build.xml
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/build.xml?rev=1221635&r1=1221634&r2=1221635&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/build.xml (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/build.xml Wed Dec 21 07:43:04 2011
@@ -35,7 +35,6 @@
<property name="driver.version" value="0.1.0"/>
<property name="driver.jar" value="${ant.project.name}-${driver.version}.jar"/>
<property name="final.name" value="${ant.project.name}-${driver.version}" />
-
<property name="hcatalog.dir" value="${basedir}/../../" />
<!-- hive properties -->
@@ -65,6 +64,8 @@
<property name="test.build.dir" value="${build.dir}/test" />
<property name="test.build.classes" value="${test.build.dir}/classes" />
<property name="test.log.dir" value="${test.build.dir}/logs" />
+ <property name="test.tmp.dir" value="${test.build.dir}/temp" />
+ <property name="test.data.dir" value="${test.build.dir}/data" />
<property name="test.timeout" value="2700000" />
<property name="test.junit.output.format" value="plain" />
<property name="test.all.file" value="${test.src.dir}/all-tests"/>
@@ -110,10 +111,37 @@
<pathelement location="${build.classes}" />
<pathelement location="conf"/>
<pathelement location="${hive.conf.dir}"/>
+ <!-- jars Thrift depends on -->
+ <fileset dir="${ivy.lib.dir}" includes="libthrift*.jar"/>
+ <!-- jars Hive depends on -->
+ <fileset dir="${hive.root}/build/ivy/lib/default/">
+ <include name="**/*.jar" />
+ <exclude name="*hbase*.jar" />
+ </fileset>
+ <!-- jars Hadoop depends on -->
+ <fileset dir="${hive.root}/build/hadoopcore/hadoop-0.20.3-CDH3-SNAPSHOT/lib/" >
+ <include name="**/*.jar" />
+ </fileset>
+ <pathelement location="${driver.jar}"/>
+ <path refid="classpath"/>
+ <fileset dir="${hive.root}/build/hadoopcore/hadoop-0.20.3-CDH3-SNAPSHOT/"
+ includes="hadoop-test-0.20.3-CDH3-SNAPSHOT.jar"/>
+ </path>
+
+ <path id="test.zookeeper.classpath">
+ <pathelement location="${test.build.classes}" />
+ <pathelement location="${build.classes}" />
+ <pathelement location="conf"/>
+ <pathelement location="${hive.conf.dir}"/>
+ <!-- jars Zookeeper depends on -->
+ <fileset dir="${ivy.lib.dir}" includes="zookeeper*.jar"/>
+ <!-- jars Thrift depends on -->
+ <fileset dir="${ivy.lib.dir}" includes="libthrift*.jar"/>
<!-- jars Hive depends on -->
<fileset dir="${hive.root}/build/ivy/lib/default/">
<include name="**/*.jar" />
<exclude name="*hbase*.jar" />
+ <exclude name="*zookeeper*.jar" />
</fileset>
<!-- jars Hadoop depends on -->
<fileset dir="${hive.root}/build/hadoopcore/hadoop-0.20.3-CDH3-SNAPSHOT/lib/" >
@@ -195,7 +223,6 @@
<mkdir dir="${build.classes}" />
<mkdir dir="${test.build.classes}" />
</target>
-
<!--
================================================================================
Main Build and Jar Section
@@ -203,7 +230,7 @@
-->
<!-- Compile src files -->
<target name="compile-src" depends="init">
- <javac encoding="${build.encoding}" srcdir="${src.dir}" excludes="${excludes}"
+ <javac encoding="${build.encoding}" srcdir="${src.dir}:${basedir}/src/gen-java" excludes="${excludes}"
includes="**/*.java" destdir="${build.classes}" debug="${javac.debug}"
optimize="${javac.optimize}" target="${javac.version}"
source="${javac.version}" deprecation="${javac.deprecation}"
@@ -240,8 +267,12 @@
<sequential>
<delete dir="${test.log.dir}"/>
+ <delete dir="${test.tmp.dir}" />
+ <delete dir="${test.data.dir}" />
<mkdir dir="${test.log.dir}"/>
- <junit showoutput="${test.output}" printsummary="yes" haltonfailure="no"
+ <mkdir dir="${test.tmp.dir}" />
+ <mkdir dir="${test.data.dir}" />
+ <junit showoutput="${test.output}" printsummary="yes" haltonfailure="no"
fork="yes" maxmemory="512m" dir="${basedir}" timeout="${test.timeout}"
errorProperty="tests.failed" failureProperty="tests.failed">
<classpath>
@@ -261,7 +292,7 @@
</batchtest>
<!-- Run one test case. To use this define -Dtestcase=X on the command line -->
<batchtest fork="yes" todir="${test.log.dir}" if="testcase">
- <fileset dir="src/test" includes="**/${testcase}.java"/>
+ <fileset dir="src/test" includes="**/${testcase}.java" excludes="**/lock/*.java"/>
</batchtest>
<assertions>
@@ -269,6 +300,31 @@
</assertions>
</junit>
+ <!-- TODO: Remove the following when using HBase 0.92 -->
+ <junit showoutput="${test.output}" printsummary="yes" haltonfailure="no"
+ fork="yes" maxmemory="512m" dir="${basedir}" timeout="${test.timeout}"
+ errorProperty="tests.failed" failureProperty="tests.failed">
+ <sysproperty key="build.test.dir" value="${test.tmp.dir}" />
+ <sysproperty key="test.data.dir" value="${test.data.dir}" />
+ <sysproperty key="log4j.configuration" value="file:${basedir}/log4j.properties" />
+ <classpath>
+ <pathelement location="${test.build.classes}" />
+ <pathelement location="." />
+ <path refid="test.zookeeper.classpath"/>
+ </classpath>
+ <formatter type="${test.junit.output.format}" />
+ <batchtest fork="yes" todir="${test.log.dir}" unless="testcase">
+ <fileset dir="src/test" includes="**/lock/*Test.java"/>
+ </batchtest>
+ <!-- Run one test case. To use this define -Dtestcase=X on the command line -->
+ <batchtest fork="yes" todir="${test.log.dir}" if="testcase">
+ <fileset dir="src/test" includes="**/lock/${testcase}.java"/>
+ </batchtest>
+ <assertions>
+ <enable />
+ </assertions>
+
+ </junit>
<fail if="tests.failed">Tests failed!</fail>
</sequential>
</target>
@@ -325,8 +381,10 @@
<include name="**/*"/>
<!-- exclude test jars -->
<exclude name="*-tests.jar"/>
+ <exclude name="*zookeeper*.jar"/>
</fileset>
<fileset dir="${hive.root}/build/hbase-handler" includes="*.jar"/>
+ <fileset dir="${hive.root}/build/ivy/lib/default" includes="*zookeeper*.jar"/>
</copy>
Added: incubator/hcatalog/trunk/storage-drivers/hbase/if/transaction.thrift
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/if/transaction.thrift?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/if/transaction.thrift (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/if/transaction.thrift Wed Dec 21 07:43:04 2011
@@ -0,0 +1,11 @@
+namespace java org.apache.hcatalog.hbase.snapshot.transaction.thrift
+namespace cpp Apache.HCatalog.HBase
+
+struct StoreFamilyRevision {
+ 1: i64 revision,
+ 2: i64 timestamp
+}
+
+struct StoreFamilyRevisionList {
+ 1: list<StoreFamilyRevision> revisionList
+}
Modified: incubator/hcatalog/trunk/storage-drivers/hbase/ivy.xml
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/ivy.xml?rev=1221635&r1=1221634&r2=1221635&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/ivy.xml (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/ivy.xml Wed Dec 21 07:43:04 2011
@@ -41,6 +41,11 @@
<artifact name="hbase" type="jar" ext="jar"/>
<artifact name="hbase" type="test-jar" ext="jar" m:classifier="tests"/>
</dependency>
- <dependency org="org.apache.zookeeper" name="zookeeper" rev="${zookeeper.version}" conf="common->master"/>
+ <dependency org="org.apache.zookeeper" name="zookeeper" rev="${zookeeper.version}"
+ conf="common->master">
+ <artifact name="zookeeper" type="jar" ext="jar"/>
+ <artifact name="zookeeper" type="test-jar" ext="jar" m:classifier="tests"/>
+ </dependency>
+ <dependency org="org.apache.thrift" name="libthrift" rev="${thrift.version}" conf="common->master"/>
</dependencies>
</ivy-module>
Modified: incubator/hcatalog/trunk/storage-drivers/hbase/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/ivy/libraries.properties?rev=1221635&r1=1221634&r2=1221635&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/ivy/libraries.properties (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/ivy/libraries.properties Wed Dec 21 07:43:04 2011
@@ -17,4 +17,5 @@ junit.version=3.8.1
ivy.version=2.2.0
rats-lib.version=0.5.1
hbase.version=0.90.3
-zookeeper.version=3.3.1
+zookeeper.version=3.4.0
+thrift.version=0.7.0
Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/gen-java/org/apache/hcatalog/hbase/snapshot/transaction/thrift/StoreFamilyRevision.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/gen-java/org/apache/hcatalog/hbase/snapshot/transaction/thrift/StoreFamilyRevision.java?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/gen-java/org/apache/hcatalog/hbase/snapshot/transaction/thrift/StoreFamilyRevision.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/gen-java/org/apache/hcatalog/hbase/snapshot/transaction/thrift/StoreFamilyRevision.java Wed Dec 21 07:43:04 2011
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This class is used to store the revision and timestamp of a column family
+ * in a transaction.
+ *
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.7.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ */
+package org.apache.hcatalog.hbase.snapshot.transaction.thrift;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StoreFamilyRevision implements org.apache.thrift.TBase<StoreFamilyRevision, StoreFamilyRevision._Fields>, java.io.Serializable, Cloneable {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("StoreFamilyRevision");
+
+ private static final org.apache.thrift.protocol.TField REVISION_FIELD_DESC = new org.apache.thrift.protocol.TField("revision", org.apache.thrift.protocol.TType.I64, (short)1);
+ private static final org.apache.thrift.protocol.TField TIMESTAMP_FIELD_DESC = new org.apache.thrift.protocol.TField("timestamp", org.apache.thrift.protocol.TType.I64, (short)2);
+
+ public long revision; // required
+ public long timestamp; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ REVISION((short)1, "revision"),
+ TIMESTAMP((short)2, "timestamp");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // REVISION
+ return REVISION;
+ case 2: // TIMESTAMP
+ return TIMESTAMP;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ private static final int __REVISION_ISSET_ID = 0;
+ private static final int __TIMESTAMP_ISSET_ID = 1;
+ private BitSet __isset_bit_vector = new BitSet(2);
+
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.REVISION, new org.apache.thrift.meta_data.FieldMetaData("revision", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+ tmpMap.put(_Fields.TIMESTAMP, new org.apache.thrift.meta_data.FieldMetaData("timestamp", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(StoreFamilyRevision.class, metaDataMap);
+ }
+
+ public StoreFamilyRevision() {
+ }
+
+ public StoreFamilyRevision(
+ long revision,
+ long timestamp)
+ {
+ this();
+ this.revision = revision;
+ setRevisionIsSet(true);
+ this.timestamp = timestamp;
+ setTimestampIsSet(true);
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public StoreFamilyRevision(StoreFamilyRevision other) {
+ __isset_bit_vector.clear();
+ __isset_bit_vector.or(other.__isset_bit_vector);
+ this.revision = other.revision;
+ this.timestamp = other.timestamp;
+ }
+
+ public StoreFamilyRevision deepCopy() {
+ return new StoreFamilyRevision(this);
+ }
+
+ @Override
+ public void clear() {
+ setRevisionIsSet(false);
+ this.revision = 0;
+ setTimestampIsSet(false);
+ this.timestamp = 0;
+ }
+
+ public long getRevision() {
+ return this.revision;
+ }
+
+ public StoreFamilyRevision setRevision(long revision) {
+ this.revision = revision;
+ setRevisionIsSet(true);
+ return this;
+ }
+
+ public void unsetRevision() {
+ __isset_bit_vector.clear(__REVISION_ISSET_ID);
+ }
+
+ /** Returns true if field revision is set (has been assigned a value) and false otherwise */
+ public boolean isSetRevision() {
+ return __isset_bit_vector.get(__REVISION_ISSET_ID);
+ }
+
+ public void setRevisionIsSet(boolean value) {
+ __isset_bit_vector.set(__REVISION_ISSET_ID, value);
+ }
+
+ public long getTimestamp() {
+ return this.timestamp;
+ }
+
+ public StoreFamilyRevision setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ setTimestampIsSet(true);
+ return this;
+ }
+
+ public void unsetTimestamp() {
+ __isset_bit_vector.clear(__TIMESTAMP_ISSET_ID);
+ }
+
+ /** Returns true if field timestamp is set (has been assigned a value) and false otherwise */
+ public boolean isSetTimestamp() {
+ return __isset_bit_vector.get(__TIMESTAMP_ISSET_ID);
+ }
+
+ public void setTimestampIsSet(boolean value) {
+ __isset_bit_vector.set(__TIMESTAMP_ISSET_ID, value);
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case REVISION:
+ if (value == null) {
+ unsetRevision();
+ } else {
+ setRevision((Long)value);
+ }
+ break;
+
+ case TIMESTAMP:
+ if (value == null) {
+ unsetTimestamp();
+ } else {
+ setTimestamp((Long)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case REVISION:
+ return Long.valueOf(getRevision());
+
+ case TIMESTAMP:
+ return Long.valueOf(getTimestamp());
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case REVISION:
+ return isSetRevision();
+ case TIMESTAMP:
+ return isSetTimestamp();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof StoreFamilyRevision)
+ return this.equals((StoreFamilyRevision)that);
+ return false;
+ }
+
+ public boolean equals(StoreFamilyRevision that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_revision = true;
+ boolean that_present_revision = true;
+ if (this_present_revision || that_present_revision) {
+ if (!(this_present_revision && that_present_revision))
+ return false;
+ if (this.revision != that.revision)
+ return false;
+ }
+
+ boolean this_present_timestamp = true;
+ boolean that_present_timestamp = true;
+ if (this_present_timestamp || that_present_timestamp) {
+ if (!(this_present_timestamp && that_present_timestamp))
+ return false;
+ if (this.timestamp != that.timestamp)
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ public int compareTo(StoreFamilyRevision other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+ StoreFamilyRevision typedOther = (StoreFamilyRevision)other;
+
+ lastComparison = Boolean.valueOf(isSetRevision()).compareTo(typedOther.isSetRevision());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetRevision()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.revision, typedOther.revision);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(isSetTimestamp()).compareTo(typedOther.isSetTimestamp());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetTimestamp()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.timestamp, typedOther.timestamp);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField field;
+ iprot.readStructBegin();
+ while (true)
+ {
+ field = iprot.readFieldBegin();
+ if (field.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (field.id) {
+ case 1: // REVISION
+ if (field.type == org.apache.thrift.protocol.TType.I64) {
+ this.revision = iprot.readI64();
+ setRevisionIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ case 2: // TIMESTAMP
+ if (field.type == org.apache.thrift.protocol.TType.I64) {
+ this.timestamp = iprot.readI64();
+ setTimestampIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked in the validate method
+ validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ oprot.writeFieldBegin(REVISION_FIELD_DESC);
+ oprot.writeI64(this.revision);
+ oprot.writeFieldEnd();
+ oprot.writeFieldBegin(TIMESTAMP_FIELD_DESC);
+ oprot.writeI64(this.timestamp);
+ oprot.writeFieldEnd();
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("StoreFamilyRevision(");
+ boolean first = true;
+
+ sb.append("revision:");
+ sb.append(this.revision);
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("timestamp:");
+ sb.append(this.timestamp);
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+ __isset_bit_vector = new BitSet(1);
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+}
+
Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/gen-java/org/apache/hcatalog/hbase/snapshot/transaction/thrift/StoreFamilyRevisionList.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/gen-java/org/apache/hcatalog/hbase/snapshot/transaction/thrift/StoreFamilyRevisionList.java?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/gen-java/org/apache/hcatalog/hbase/snapshot/transaction/thrift/StoreFamilyRevisionList.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/gen-java/org/apache/hcatalog/hbase/snapshot/transaction/thrift/StoreFamilyRevisionList.java Wed Dec 21 07:43:04 2011
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This class is used to store a list of StoreFamilyRevision for a column
+ * family in zookeeper.
+ *
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.7.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ */
+package org.apache.hcatalog.hbase.snapshot.transaction.thrift;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class StoreFamilyRevisionList implements org.apache.thrift.TBase<StoreFamilyRevisionList, StoreFamilyRevisionList._Fields>, java.io.Serializable, Cloneable {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("StoreFamilyRevisionList");
+
+ private static final org.apache.thrift.protocol.TField REVISION_LIST_FIELD_DESC = new org.apache.thrift.protocol.TField("revisionList", org.apache.thrift.protocol.TType.LIST, (short)1);
+
+ public List<StoreFamilyRevision> revisionList; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ REVISION_LIST((short)1, "revisionList");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // REVISION_LIST
+ return REVISION_LIST;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.REVISION_LIST, new org.apache.thrift.meta_data.FieldMetaData("revisionList", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST,
+ new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, StoreFamilyRevision.class))));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(StoreFamilyRevisionList.class, metaDataMap);
+ }
+
+ public StoreFamilyRevisionList() {
+ }
+
+ public StoreFamilyRevisionList(
+ List<StoreFamilyRevision> revisionList)
+ {
+ this();
+ this.revisionList = revisionList;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public StoreFamilyRevisionList(StoreFamilyRevisionList other) {
+ if (other.isSetRevisionList()) {
+ List<StoreFamilyRevision> __this__revisionList = new ArrayList<StoreFamilyRevision>();
+ for (StoreFamilyRevision other_element : other.revisionList) {
+ __this__revisionList.add(new StoreFamilyRevision(other_element));
+ }
+ this.revisionList = __this__revisionList;
+ }
+ }
+
+ public StoreFamilyRevisionList deepCopy() {
+ return new StoreFamilyRevisionList(this);
+ }
+
+ @Override
+ public void clear() {
+ this.revisionList = null;
+ }
+
+ public int getRevisionListSize() {
+ return (this.revisionList == null) ? 0 : this.revisionList.size();
+ }
+
+ public java.util.Iterator<StoreFamilyRevision> getRevisionListIterator() {
+ return (this.revisionList == null) ? null : this.revisionList.iterator();
+ }
+
+ public void addToRevisionList(StoreFamilyRevision elem) {
+ if (this.revisionList == null) {
+ this.revisionList = new ArrayList<StoreFamilyRevision>();
+ }
+ this.revisionList.add(elem);
+ }
+
+ public List<StoreFamilyRevision> getRevisionList() {
+ return this.revisionList;
+ }
+
+ public StoreFamilyRevisionList setRevisionList(List<StoreFamilyRevision> revisionList) {
+ this.revisionList = revisionList;
+ return this;
+ }
+
+ public void unsetRevisionList() {
+ this.revisionList = null;
+ }
+
+ /** Returns true if field revisionList is set (has been assigned a value) and false otherwise */
+ public boolean isSetRevisionList() {
+ return this.revisionList != null;
+ }
+
+ public void setRevisionListIsSet(boolean value) {
+ if (!value) {
+ this.revisionList = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case REVISION_LIST:
+ if (value == null) {
+ unsetRevisionList();
+ } else {
+ setRevisionList((List<StoreFamilyRevision>)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case REVISION_LIST:
+ return getRevisionList();
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case REVISION_LIST:
+ return isSetRevisionList();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof StoreFamilyRevisionList)
+ return this.equals((StoreFamilyRevisionList)that);
+ return false;
+ }
+
+ public boolean equals(StoreFamilyRevisionList that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_revisionList = true && this.isSetRevisionList();
+ boolean that_present_revisionList = true && that.isSetRevisionList();
+ if (this_present_revisionList || that_present_revisionList) {
+ if (!(this_present_revisionList && that_present_revisionList))
+ return false;
+ if (!this.revisionList.equals(that.revisionList))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ public int compareTo(StoreFamilyRevisionList other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+ StoreFamilyRevisionList typedOther = (StoreFamilyRevisionList)other;
+
+ lastComparison = Boolean.valueOf(isSetRevisionList()).compareTo(typedOther.isSetRevisionList());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetRevisionList()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.revisionList, typedOther.revisionList);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField field;
+ iprot.readStructBegin();
+ while (true)
+ {
+ field = iprot.readFieldBegin();
+ if (field.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (field.id) {
+ case 1: // REVISION_LIST
+ if (field.type == org.apache.thrift.protocol.TType.LIST) {
+ {
+ org.apache.thrift.protocol.TList _list0 = iprot.readListBegin();
+ this.revisionList = new ArrayList<StoreFamilyRevision>(_list0.size);
+ for (int _i1 = 0; _i1 < _list0.size; ++_i1)
+ {
+ StoreFamilyRevision _elem2; // required
+ _elem2 = new StoreFamilyRevision();
+ _elem2.read(iprot);
+ this.revisionList.add(_elem2);
+ }
+ iprot.readListEnd();
+ }
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked in the validate method
+ validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (this.revisionList != null) {
+ oprot.writeFieldBegin(REVISION_LIST_FIELD_DESC);
+ {
+ oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, this.revisionList.size()));
+ for (StoreFamilyRevision _iter3 : this.revisionList)
+ {
+ _iter3.write(oprot);
+ }
+ oprot.writeListEnd();
+ }
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("StoreFamilyRevisionList(");
+ boolean first = true;
+
+ sb.append("revisionList:");
+ if (this.revisionList == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.revisionList);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+}
+
Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/FamilyRevision.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/FamilyRevision.java?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/FamilyRevision.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/FamilyRevision.java Wed Dec 21 07:43:04 2011
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+
+
+/**
+ * A FamiliyRevision class consists of a revision number and a expiration
+ * timestamp. When a write transaction starts, the transaction
+ * object is appended to the transaction list of the each column
+ * family and stored in the corresponding znode. When a write transaction is
+ * committed, the transaction object is removed from the list.
+ */
+class FamilyRevision implements
+ Comparable<FamilyRevision> {
+
+ private long revision;
+
+ private long timestamp;
+
+ /**
+ * Create a FamilyRevision object
+ * @param rev revision number
+ * @param ts expiration timestamp
+ */
+ FamilyRevision(long rev, long ts) {
+ this.revision = rev;
+ this.timestamp = ts;
+ }
+
+ long getRevision() {
+ return revision;
+ }
+
+ long getExpireTimestamp() {
+ return timestamp;
+ }
+
+ void setExpireTimestamp(long ts) {
+ timestamp = ts;
+ }
+
+ @Override
+ public String toString() {
+ String description = "revision: " + revision + " ts: " + timestamp;
+ return description;
+ }
+
+ @Override
+ public int compareTo(FamilyRevision o) {
+ long d = revision - o.getRevision();
+ return (d < 0) ? -1 : (d > 0) ? 1 : 0;
+ }
+
+
+}
Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/IDGenerator.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/IDGenerator.java?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/IDGenerator.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/IDGenerator.java Wed Dec 21 07:43:04 2011
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+import java.io.IOException;
+import java.nio.charset.Charset;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hcatalog.hbase.snapshot.lock.LockListener;
+import org.apache.hcatalog.hbase.snapshot.lock.WriteLock;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+
+/**
+ * This class generates revision id's for transactions.
+ */
+class IDGenerator implements LockListener{
+
+ private ZooKeeper zookeeper;
+ private String zNodeDataLoc;
+ private String zNodeLockBasePath;
+ private long id;
+ private static final Log LOG = LogFactory.getLog(IDGenerator.class);
+
+ IDGenerator(ZooKeeper zookeeper, String tableName, String idGenNode)
+ throws IOException {
+ this.zookeeper = zookeeper;
+ this.zNodeDataLoc = idGenNode;
+ this.zNodeLockBasePath = PathUtil.getLockManagementNode(idGenNode);
+ }
+
+ /**
+ * This method obtains a revision id for a transaction.
+ *
+ * @return revision ID
+ * @throws IOException
+ */
+ public long obtainID() throws IOException{
+ WriteLock wLock = new WriteLock(zookeeper, zNodeLockBasePath, Ids.OPEN_ACL_UNSAFE);
+ wLock.setLockListener(this);
+ try {
+ boolean lockGrabbed = wLock.lock();
+ if (lockGrabbed == false) {
+ //TO DO : Let this request queue up and try obtaining lock.
+ throw new IOException("Unable to obtain lock to obtain id.");
+ } else {
+ id = incrementAndReadCounter();
+ }
+ } catch (KeeperException e) {
+ LOG.warn("Exception while obtaining lock for ID.", e);
+ throw new IOException("Exception while obtaining lock for ID.", e);
+ } catch (InterruptedException e) {
+ LOG.warn("Exception while obtaining lock for ID.", e);
+ throw new IOException("Exception while obtaining lock for ID.", e);
+ } finally {
+ wLock.unlock();
+ }
+ return id;
+ }
+
+ /**
+ * This method reads the latest revision ID that has been used. The ID
+ * returned by this method cannot be used for transaction.
+ * @return revision ID
+ * @throws IOException
+ */
+ public long readID() throws IOException{
+ long curId;
+ try {
+ Stat stat = new Stat();
+ byte[] data = zookeeper.getData(this.zNodeDataLoc, false, stat);
+ curId = Long.parseLong(new String(data,Charset.forName("UTF-8")));
+ } catch (KeeperException e) {
+ LOG.warn("Exception while reading current revision id.", e);
+ throw new IOException("Exception while reading current revision id.", e);
+ } catch (InterruptedException e) {
+ LOG.warn("Exception while reading current revision id.", e);
+ throw new IOException("Exception while reading current revision id.",e);
+ }
+
+ return curId;
+ }
+
+
+ private long incrementAndReadCounter() throws IOException{
+
+ long curId, usedId;
+ try {
+ Stat stat = new Stat();
+ byte[] data = zookeeper.getData(this.zNodeDataLoc, false, stat);
+ usedId = Long.parseLong((new String(data,Charset.forName("UTF-8"))));
+ curId = usedId +1;
+ String lastUsedID = String.valueOf(curId);
+ zookeeper.setData(this.zNodeDataLoc, lastUsedID.getBytes(Charset.forName("UTF-8")), -1 );
+
+ } catch (KeeperException e) {
+ LOG.warn("Exception while incrementing revision id.", e);
+ throw new IOException("Exception while incrementing revision id. ", e);
+ } catch (InterruptedException e) {
+ LOG.warn("Exception while incrementing revision id.", e);
+ throw new IOException("Exception while incrementing revision id. ", e);
+ }
+
+ return curId;
+ }
+
+ /*
+ * @see org.apache.hcatalog.hbase.snapshot.lock.LockListener#lockAcquired()
+ */
+ @Override
+ public void lockAcquired() {
+
+
+ }
+
+ /*
+ * @see org.apache.hcatalog.hbase.snapshot.lock.LockListener#lockReleased()
+ */
+ @Override
+ public void lockReleased() {
+
+ }
+
+
+}
Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/PathUtil.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/PathUtil.java?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/PathUtil.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/PathUtil.java Wed Dec 21 07:43:04 2011
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+
+
+/**
+ * The PathUtil class is a utility class to provide information about various
+ * znode paths. The following is the znode structure used for storing information.
+ * baseDir/ClockNode
+ * baseDir/TrasactionBasePath
+ * baseDir/TrasactionBasePath/TableA/revisionID
+ * baseDir/TrasactionBasePath/TableA/columnFamily-1
+ * baseDir/TrasactionBasePath/TableA/columnFamily-1/runnningTxns
+ * baseDir/TrasactionBasePath/TableA/columnFamily-1/abortedTxns
+ * baseDir/TrasactionBasePath/TableB/revisionID
+ * baseDir/TrasactionBasePath/TableB/columnFamily-1
+ * baseDir/TrasactionBasePath/TableB/columnFamily-1/runnningTxns
+ * baseDir/TrasactionBasePath/TableB/columnFamily-1/abortedTxns
+
+ */
+public class PathUtil{
+
+ static final String DATA_DIR = "/data";
+ static final String CLOCK_NODE = "/clock";
+
+ /**
+ * This method returns the data path associated with the currently
+ * running transactions of a given table and column/column family.
+ * @param baseDir
+ * @param tableName
+ * @param columnFamily
+ * @return The path of the running transactions data.
+ */
+ static String getRunningTxnInfoPath(String baseDir, String tableName,
+ String columnFamily) {
+ String txnBasePath = getTransactionBasePath(baseDir);
+ String path = txnBasePath + "/" + tableName + "/" + columnFamily
+ + "/runningTxns";
+ return path;
+ }
+
+ /**
+ * This method returns the data path associated with the aborted
+ * transactions of a given table and column/column family.
+ * @param baseDir The base directory for revision management.
+ * @param tableName The name of the table.
+ * @param columnFamily
+ * @return The path of the aborted transactions data.
+ */
+ static String getAbortInformationPath(String baseDir, String tableName,
+ String columnFamily) {
+ String txnBasePath = getTransactionBasePath(baseDir);
+ String path = txnBasePath + "/" + tableName + "/" + columnFamily
+ + "/abortData";
+ return path;
+ }
+
+ /**
+ * Gets the revision id node for a given table.
+ *
+ * @param baseDir the base dir for revision management.
+ * @param tableName the table name
+ * @return the revision id node path.
+ */
+ static String getRevisionIDNode(String baseDir, String tableName) {
+ String rmBasePath = getTransactionBasePath(baseDir);
+ String revisionIDNode = rmBasePath + "/" + tableName + "/idgen";
+ return revisionIDNode;
+ }
+
+ /**
+ * Gets the lock management node for any znode that needs to be locked.
+ *
+ * @param path the path of the znode.
+ * @return the lock management node path.
+ */
+ static String getLockManagementNode(String path) {
+ String lockNode = path + "_locknode_";
+ return lockNode;
+ }
+
+ /**
+ * This method returns the base path for the transaction data.
+ *
+ * @param baseDir The base dir for revision management.
+ * @return The base path for the transaction data.
+ */
+ static String getTransactionBasePath(String baseDir) {
+ String txnBaseNode = baseDir + DATA_DIR;
+ return txnBaseNode;
+ }
+
+ /**
+ * Gets the txn data path for a given table.
+ *
+ * @param baseDir the base dir for revision management.
+ * @param tableName the table name
+ * @return the txn data path for the table.
+ */
+ static String getTxnDataPath(String baseDir, String tableName){
+ String txnBasePath = getTransactionBasePath(baseDir);
+ String path = txnBasePath + "/" + tableName;
+ return path;
+ }
+
+ /**
+ * This method returns the data path for clock node.
+ *
+ * @param baseDir
+ * @return The data path for clock.
+ */
+ static String getClockPath(String baseDir) {
+ String clockNode = baseDir + CLOCK_NODE;
+ return clockNode;
+ }
+}
Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java Wed Dec 21 07:43:04 2011
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * This interface provides APIs for implementing revision management.
+ */
+public interface RevisionManager {
+
+ public static final String REVISION_MGR_IMPL_CLASS = "revision.manager.impl.class";
+
+ /**
+ * Initialize the revision manager.
+ */
+ public void initialize(Properties properties);
+
+ /**
+ * Opens the revision manager.
+ *
+ * @throws IOException
+ */
+ public void open() throws IOException;
+
+ /**
+ * Closes the revision manager.
+ *
+ * @throws IOException
+ */
+ public void close() throws IOException;
+
+ /**
+ * Start the write transaction.
+ *
+ * @param table
+ * @param families
+ * @return
+ * @throws IOException
+ */
+ public Transaction beginWriteTransaction(String table, List<String> families)
+ throws IOException;
+
+ /**
+ * Start the write transaction.
+ *
+ * @param table
+ * @param families
+ * @param keepAlive
+ * @return
+ * @throws IOException
+ */
+ public Transaction beginWriteTransaction(String table,
+ List<String> families, long keepAlive) throws IOException;
+
+ /**
+ * Commit the write transaction.
+ *
+ * @param transaction
+ * @throws IOException
+ */
+ public void commitWriteTransaction(Transaction transaction)
+ throws IOException;
+
+ /**
+ * Abort the write transaction.
+ *
+ * @param transaction
+ * @throws IOException
+ */
+ public void abortWriteTransaction(Transaction transaction)
+ throws IOException;
+
+ /**
+ * Create the latest snapshot of the table.
+ *
+ * @param tableName
+ * @return
+ * @throws IOException
+ */
+ public TableSnapshot createSnapshot(String tableName) throws IOException;
+
+ /**
+ * Create the snapshot of the table using the revision number.
+ *
+ * @param tableName
+ * @param revision
+ * @return
+ * @throws IOException
+ */
+ public TableSnapshot createSnapshot(String tableName, long revision)
+ throws IOException;
+
+ /**
+ * Extends the expiration of a transaction by the time indicated by keep alive.
+ *
+ * @param transaction
+ * @throws IOException
+ */
+ public void keepAlive(Transaction transaction) throws IOException;
+
+}
Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java Wed Dec 21 07:43:04 2011
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+
+import java.io.IOException;
+import java.util.Properties;
+
+public class RevisionManagerFactory {
+
+ /**
+ * Gets an instance of revision manager.
+ *
+ * @param properties The properties required to created the revision manager.
+ * @return the revision manager An instance of revision manager.
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
+ public static RevisionManager getRevisionManager(Properties properties) throws IOException{
+
+ RevisionManager revisionMgr;
+ ClassLoader classLoader = Thread.currentThread()
+ .getContextClassLoader();
+ if (classLoader == null) {
+ classLoader = RevisionManagerFactory.class.getClassLoader();
+ }
+ String className = properties.getProperty(
+ RevisionManager.REVISION_MGR_IMPL_CLASS,
+ ZKBasedRevisionManager.class.getName());
+ try {
+
+ @SuppressWarnings("unchecked")
+ Class<? extends RevisionManager> revisionMgrClass = (Class<? extends RevisionManager>) Class
+ .forName(className, true , classLoader);
+ revisionMgr = (RevisionManager) revisionMgrClass.newInstance();
+ revisionMgr.initialize(properties);
+ } catch (ClassNotFoundException e) {
+ throw new IOException(
+ "The implementation class of revision manager not found.",
+ e);
+ } catch (InstantiationException e) {
+ throw new IOException(
+ "Exception encountered during instantiating revision manager implementation.",
+ e);
+ } catch (IllegalAccessException e) {
+ throw new IOException(
+ "IllegalAccessException encountered during instantiating revision manager implementation.",
+ e);
+ } catch (IllegalArgumentException e) {
+ throw new IOException(
+ "IllegalArgumentException encountered during instantiating revision manager implementation.",
+ e);
+ }
+ return revisionMgr;
+ }
+
+}
Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java Wed Dec 21 07:43:04 2011
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The snapshot for a table and a list of column families.
+ */
+public class TableSnapshot {
+
+ private String name;
+
+ private Map<String, Long> cfRevisionMap;
+
+
+ public TableSnapshot(String name, Map<String, Long> cfRevMap) {
+ this.name = name;
+ this.cfRevisionMap = cfRevMap;
+ }
+
+ /**
+ * Gets the table name.
+ *
+ * @return String The name of the table.
+ */
+ public String getTableName() {
+ return name;
+ }
+
+ /**
+ * Gets the column families.
+ *
+ * @return List<String> A list of column families associated with the snapshot.
+ */
+ public List<String> getColumnFamilies(){
+ return new ArrayList<String>(this.cfRevisionMap.keySet());
+ }
+
+ /**
+ * Gets the revision.
+ *
+ * @param familyName The name of the column family.
+ * @return the revision
+ */
+ public long getRevision(String familyName){
+ return this.cfRevisionMap.get(familyName);
+ }
+
+ @Override
+ public String toString() {
+ String snapshot = "Table Name : " + name
+ + " Column Familiy revision : " + cfRevisionMap.toString();
+ return snapshot;
+ }
+}
Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/Transaction.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/Transaction.java?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/Transaction.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/Transaction.java Wed Dec 21 07:43:04 2011
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This class is responsible for storing information related to
+ * transactions.
+ */
+public class Transaction implements Serializable {
+
+ private String tableName;
+ private List<String> columnFamilies = new ArrayList<String>();
+ private long timeStamp;
+ private long keepAlive;
+ private long revision;
+
+
+ Transaction(String tableName, List<String> columnFamilies, long revision, long timestamp) {
+ this.tableName = tableName;
+ this.columnFamilies = columnFamilies;
+ this.timeStamp = timestamp;
+ this.revision = revision;
+ }
+
+ /**
+ * @return The revision number associated with a transaction.
+ */
+ public long getRevisionNumber(){
+ return this.revision;
+ }
+
+ /**
+ * @return The table name associated with a transaction.
+ */
+ public String getTableName() {
+ return tableName;
+ }
+
+ /**
+ * @return The column families associated with a transaction.
+ */
+ public List<String> getColumnFamilies() {
+ return columnFamilies;
+ }
+
+ /**
+ * @return The expire timestamp associated with a transaction.
+ */
+ long getTransactionExpireTimeStamp(){
+ return this.timeStamp + this.keepAlive;
+ }
+
+ void setKeepAlive(long seconds){
+ this.keepAlive = seconds;
+ }
+
+ /**
+ * Gets the keep alive value.
+ *
+ * @return long The keep alive value for the transaction.
+ */
+ public long getKeepAliveValue(){
+ return this.keepAlive;
+ }
+
+ /**
+ * Gets the family revision info.
+ *
+ * @return FamilyRevision An instance of FamilyRevision associated with the transaction.
+ */
+ FamilyRevision getFamilyRevisionInfo(){
+ return new FamilyRevision(revision, getTransactionExpireTimeStamp());
+ }
+
+ /**
+ * Keep alive transaction. This methods extends the expire timestamp of a
+ * transaction by the "keep alive" amount.
+ */
+ void keepAliveTransaction(){
+ this.timeStamp = this.timeStamp + this.keepAlive;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Revision : ");
+ sb.append(this.getRevisionNumber());
+ sb.append(" Timestamp : ");
+ sb.append(this.getTransactionExpireTimeStamp());
+ sb.append("\n").append("Table : ");
+ sb.append(this.tableName).append("\n");
+ sb.append("Column Families : ");
+ sb.append(this.columnFamilies.toString());
+ return sb.toString();
+ }
+}
Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java Wed Dec 21 07:43:04 2011
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hcatalog.hbase.snapshot.lock.LockListener;
+import org.apache.hcatalog.hbase.snapshot.lock.WriteLock;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+
+/**
+ * The service for providing revision management to Hbase tables.
+ */
+public class ZKBasedRevisionManager implements RevisionManager{
+
+ public static final String HOSTLIST = "revision.manager.zk.HostList";
+ public static final String DATADIR = "revision.manager.zk.DataDir";
+ private static int DEFAULT_WRITE_TRANSACTION_TIMEOUT = 14400000;
+ private static final Log LOG = LogFactory.getLog(ZKBasedRevisionManager.class);
+ private String zkHostList;
+ private String baseDir;
+ private ZKUtil zkUtil;
+
+
+ /*
+ * @see org.apache.hcatalog.hbase.snapshot.RevisionManager#initialize()
+ */
+ @Override
+ public void initialize(Properties properties) {
+ this.zkHostList = properties.getProperty(ZKBasedRevisionManager.HOSTLIST, "localhost:2181");
+ this.baseDir = properties.getProperty(ZKBasedRevisionManager.DATADIR,"/revision-management");
+ }
+
+ /**
+ * Open a ZooKeeper connection
+ * @throws java.io.IOException
+ */
+
+ public void open() throws IOException {
+ zkUtil = new ZKUtil(zkHostList, this.baseDir);
+ zkUtil.createRootZNodes();
+ LOG.info("Created root znodes for revision manager.");
+ }
+
+ /**
+ * Close Zookeeper connection
+ */
+ public void close() {
+ zkUtil.closeZKConnection();
+ }
+
+ private void checkInputParams(String table, List<String> families) {
+ if (table == null) {
+ throw new IllegalArgumentException(
+ "The table name must be specified for reading.");
+ }
+ if (families == null || families.isEmpty()) {
+ throw new IllegalArgumentException(
+ "At least one column family should be specified for reading.");
+ }
+ }
+
+
+ /* @param table
+ /* @param families
+ /* @param keepAlive
+ /* @return
+ /* @throws IOException
+ * @see org.apache.hcatalog.hbase.snapshot.RevisionManager#beginWriteTransaction(java.lang.String, java.util.List, long)
+ */
+ public Transaction beginWriteTransaction(String table,
+ List<String> families, long keepAlive) throws IOException {
+
+ checkInputParams(table, families);
+ zkUtil.setUpZnodesForTable(table, families);
+ long nextId = zkUtil.nextId(table);
+ long expireTimestamp = zkUtil.getTimeStamp();
+ Transaction transaction = new Transaction(table, families, nextId,
+ expireTimestamp);
+ if (keepAlive != -1) {
+ transaction.setKeepAlive(keepAlive);
+ } else {
+ transaction.setKeepAlive(DEFAULT_WRITE_TRANSACTION_TIMEOUT);
+ }
+
+ refreshTransactionList(transaction.getTableName());
+ String lockPath = prepareLockNode(table);
+ WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath,
+ Ids.OPEN_ACL_UNSAFE);
+ RMLockListener myLockListener = new RMLockListener();
+ wLock.setLockListener(myLockListener);
+ try {
+ boolean lockGrabbed = wLock.lock();
+ if (lockGrabbed == false) {
+ //TO DO : Let this request queue up and try obtaining lock.
+ throw new IOException(
+ "Unable to obtain lock while beginning transaction. "
+ + transaction.toString());
+ } else {
+ List<String> colFamilies = transaction.getColumnFamilies();
+ FamilyRevision revisionData = transaction.getFamilyRevisionInfo();
+ for (String cfamily : colFamilies) {
+ String path = PathUtil.getRunningTxnInfoPath(
+ baseDir, table, cfamily);
+ zkUtil.updateData(path, revisionData,
+ ZKUtil.UpdateMode.APPEND);
+ }
+ }
+ } catch (KeeperException e) {
+ throw new IOException("Exception while obtaining lock.", e);
+ } catch (InterruptedException e) {
+ throw new IOException("Exception while obtaining lock.", e);
+ }
+ finally {
+ wLock.unlock();
+ }
+
+ return transaction;
+ }
+
+ /* @param table The table name.
+ /* @param families The column families involved in the transaction.
+ /* @return transaction The transaction which was started.
+ /* @throws IOException
+ * @see org.apache.hcatalog.hbase.snapshot.RevisionManager#beginWriteTransaction(java.lang.String, java.util.List)
+ */
+ public Transaction beginWriteTransaction(String table, List<String> families)
+ throws IOException {
+ return beginWriteTransaction(table, families, -1);
+ }
+
+ /**
+ * This method commits a write transaction.
+ * @param transaction The revision information associated with transaction.
+ * @throws java.io.IOException
+ */
+ public void commitWriteTransaction(Transaction transaction) throws IOException {
+ refreshTransactionList(transaction.getTableName());
+
+ String lockPath = prepareLockNode(transaction.getTableName());
+ WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath,
+ Ids.OPEN_ACL_UNSAFE);
+ RMLockListener myLockListener = new RMLockListener();
+ wLock.setLockListener(myLockListener);
+ try {
+ boolean lockGrabbed = wLock.lock();
+ if (lockGrabbed == false) {
+ //TO DO : Let this request queue up and try obtaining lock.
+ throw new IOException(
+ "Unable to obtain lock while commiting transaction. "
+ + transaction.toString());
+ } else {
+ String tableName = transaction.getTableName();
+ List<String> colFamilies = transaction.getColumnFamilies();
+ FamilyRevision revisionData = transaction.getFamilyRevisionInfo();
+ for (String cfamily : colFamilies) {
+ String path = PathUtil.getRunningTxnInfoPath(
+ baseDir, tableName, cfamily);
+ zkUtil.updateData(path, revisionData,
+ ZKUtil.UpdateMode.REMOVE);
+ }
+
+ }
+ } catch (KeeperException e) {
+ throw new IOException("Exception while obtaining lock.", e);
+ } catch (InterruptedException e) {
+ throw new IOException("Exception while obtaining lock.", e);
+ }
+ finally {
+ wLock.unlock();
+ }
+ LOG.info("Write Transaction committed: " + transaction.toString());
+ }
+
+ /**
+ * This method aborts a write transaction.
+ * @param state the state associated with the Transaction
+ * @throws java.io.IOException
+ */
+ public void abortWriteTransaction(Transaction transaction) throws IOException {
+
+ refreshTransactionList(transaction.getTableName());
+ String lockPath = prepareLockNode(transaction.getTableName());
+ WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath,
+ Ids.OPEN_ACL_UNSAFE);
+ RMLockListener myLockListener = new RMLockListener();
+ wLock.setLockListener(myLockListener);
+ try {
+ boolean lockGrabbed = wLock.lock();
+ if (lockGrabbed == false) {
+ //TO DO : Let this request queue up and try obtaining lock.
+ throw new IOException(
+ "Unable to obtain lock while aborting transaction. "
+ + transaction.toString());
+ } else {
+ String tableName = transaction.getTableName();
+ List<String> colFamilies = transaction.getColumnFamilies();
+ FamilyRevision revisionData = transaction
+ .getFamilyRevisionInfo();
+ for (String cfamily : colFamilies) {
+ String path = PathUtil.getRunningTxnInfoPath(
+ baseDir, tableName, cfamily);
+ zkUtil.updateData(path, revisionData,
+ ZKUtil.UpdateMode.REMOVE);
+ path = PathUtil.getAbortInformationPath(baseDir,
+ tableName, cfamily);
+ zkUtil.updateData(path, revisionData,
+ ZKUtil.UpdateMode.APPEND);
+ }
+
+ }
+ } catch (KeeperException e) {
+ throw new IOException("Exception while obtaining lock.", e);
+ } catch (InterruptedException e) {
+ throw new IOException("Exception while obtaining lock.", e);
+ }
+ finally {
+ wLock.unlock();
+ }
+ LOG.info("Write Transaction aborted: " + transaction.toString());
+ }
+
+
+ /* @param transaction
+ /* @throws IOException
+ * @see org.apache.hcatalog.hbase.snapshot.RevsionManager#keepAlive(org.apache.hcatalog.hbase.snapshot.Transaction)
+ */
+ public void keepAlive(Transaction transaction)
+ throws IOException {
+
+ refreshTransactionList(transaction.getTableName());
+ transaction.keepAliveTransaction();
+ String lockPath = prepareLockNode(transaction.getTableName());
+ WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath,
+ Ids.OPEN_ACL_UNSAFE);
+ RMLockListener myLockListener = new RMLockListener();
+ wLock.setLockListener(myLockListener);
+ try {
+ boolean lockGrabbed = wLock.lock();
+ if (lockGrabbed == false) {
+ //TO DO : Let this request queue up and try obtaining lock.
+ throw new IOException(
+ "Unable to obtain lock for keep alive of transaction. "
+ + transaction.toString());
+ }else {
+ String tableName = transaction.getTableName();
+ List<String> colFamilies = transaction.getColumnFamilies();
+ FamilyRevision revisionData = transaction.getFamilyRevisionInfo();
+ for (String cfamily : colFamilies) {
+ String path = PathUtil.getRunningTxnInfoPath(
+ baseDir, tableName, cfamily);
+ zkUtil.updateData(path, revisionData,
+ ZKUtil.UpdateMode.KEEP_ALIVE);
+ }
+
+ }
+ } catch (KeeperException e) {
+ throw new IOException("Exception while obtaining lock.", e);
+ } catch (InterruptedException e) {
+ throw new IOException("Exception while obtaining lock.", e);
+ }finally {
+ wLock.unlock();
+ }
+
+ }
+
+ /* This method allows the user to create latest snapshot of a
+ /* table.
+ /* @param tableName The table whose snapshot is being created.
+ /* @return TableSnapshot An instance of TableSnaphot
+ /* @throws IOException
+ * @see org.apache.hcatalog.hbase.snapshot.RevsionManager#createSnapshot(java.lang.String)
+ */
+ public TableSnapshot createSnapshot(String tableName) throws IOException{
+ refreshTransactionList(tableName);
+ long latestID = zkUtil.currentID(tableName);
+ HashMap<String, Long> cfMap = new HashMap<String, Long>();
+ List<String> columnFamilyNames = zkUtil.getColumnFamiliesOfTable(tableName);
+
+ for(String cfName: columnFamilyNames){
+ String cfPath = PathUtil.getRunningTxnInfoPath(baseDir, tableName, cfName);
+ List<FamilyRevision> tranxList = zkUtil.getTransactionList(cfPath);
+ long version;
+ if (!tranxList.isEmpty()) {
+ Collections.sort(tranxList);
+ // get the smallest running Transaction ID
+ long runningVersion = tranxList.get(0).getRevision();
+ version = runningVersion -1;
+ } else {
+ version = latestID;
+ }
+ cfMap.put(cfName, version);
+ }
+
+ return new TableSnapshot(tableName, cfMap);
+ }
+
+ /* This method allows the user to create snapshot of a
+ /* table with a given revision number.
+ /* @param tableName
+ /* @param revision
+ /* @return TableSnapshot
+ /* @throws IOException
+ * @see org.apache.hcatalog.hbase.snapshot.RevsionManager#createSnapshot(java.lang.String, long)
+ */
+ public TableSnapshot createSnapshot(String tableName, long revision) throws IOException{
+
+ long currentID = zkUtil.currentID(tableName);
+ if (revision > currentID) {
+ throw new IOException(
+ "The revision specified in the snapshot is higher than the current revision of the table.");
+ }
+ refreshTransactionList(tableName);
+ HashMap<String, Long> cfMap = new HashMap<String, Long>();
+ List<String> columnFamilies = zkUtil.getColumnFamiliesOfTable(tableName);
+
+ for(String cf: columnFamilies){
+ cfMap.put(cf, revision);
+ }
+
+ return new TableSnapshot(tableName, cfMap);
+ }
+
+ /**
+ * Get the list of in-progress Transactions for a column family
+ * @param table the table name
+ * @param columnFamily the column family name
+ * @return a list of in-progress WriteTransactions
+ * @throws java.io.IOException
+ */
+ List<FamilyRevision> getRunningTransactions(String table,
+ String columnFamily) throws IOException {
+ String path = PathUtil.getRunningTxnInfoPath(baseDir, table,
+ columnFamily);
+ return zkUtil.getTransactionList(path);
+ }
+
+ /**
+ * Get the list of aborted Transactions for a column family
+ * @param table the table name
+ * @param columnFamily the column family name
+ * @return a list of aborted WriteTransactions
+ * @throws java.io.IOException
+ */
+ List<FamilyRevision> getAbortedWriteTransactions(String table,
+ String columnFamily) throws IOException {
+ String path = PathUtil.getAbortInformationPath(baseDir, table, columnFamily);
+ return zkUtil.getTransactionList(path);
+ }
+
+ private void refreshTransactionList(String tableName) throws IOException{
+ String lockPath = prepareLockNode(tableName);
+ WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath,
+ Ids.OPEN_ACL_UNSAFE);
+ RMLockListener myLockListener = new RMLockListener();
+ wLock.setLockListener(myLockListener);
+ try {
+ boolean lockGrabbed = wLock.lock();
+ if (lockGrabbed == false) {
+ //TO DO : Let this request queue up and try obtaining lock.
+ throw new IOException(
+ "Unable to obtain lock while refreshing transactions of table "
+ + tableName + ".");
+ }else {
+ List<String> cfPaths = zkUtil
+ .getColumnFamiliesOfTable(tableName);
+ for (String cf : cfPaths) {
+ String runningDataPath = PathUtil.getRunningTxnInfoPath(
+ baseDir, tableName, cf);
+ zkUtil.refreshTransactions(runningDataPath);
+ }
+
+ }
+ } catch (KeeperException e) {
+ throw new IOException("Exception while obtaining lock.", e);
+ } catch (InterruptedException e) {
+ throw new IOException("Exception while obtaining lock.", e);
+ } finally {
+ wLock.unlock();
+ }
+
+ }
+
+ private String prepareLockNode(String tableName) throws IOException{
+ String txnDataPath = PathUtil.getTxnDataPath(this.baseDir, tableName);
+ String lockPath = PathUtil.getLockManagementNode(txnDataPath);
+ zkUtil.ensurePathExists(lockPath, null, Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ return lockPath;
+ }
+
+
+ /*
+ * This class is a listener class for the locks used in revision management.
+ * TBD: Use the following class to signal that that the lock is actually
+ * been granted.
+ */
+ class RMLockListener implements LockListener {
+
+ /*
+ * @see org.apache.hcatalog.hbase.snapshot.lock.LockListener#lockAcquired()
+ */
+ @Override
+ public void lockAcquired() {
+
+ }
+
+ /*
+ * @see org.apache.hcatalog.hbase.snapshot.lock.LockListener#lockReleased()
+ */
+ @Override
+ public void lockReleased() {
+
+ }
+
+ }
+
+
+}