You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2013/11/13 17:47:12 UTC

svn commit: r1541609 [3/4] - in /hive/trunk: ./ cli/ hbase-handler/ hbase-handler/src/java/org/apache/hadoop/hive/hbase/ hbase-handler/src/test/org/apache/hadoop/hive/hbase/ hbase-handler/src/test/queries/negative/ hcatalog/ hcatalog/build-support/ant/...

Modified: hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java?rev=1541609&r1=1541608&r2=1541609&view=diff
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java (original)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java Wed Nov 13 16:47:11 2013
@@ -25,6 +25,7 @@ import java.util.Properties;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hive.hbase.PutWritable;
 import org.apache.hadoop.hive.ql.io.FSRecordWriter;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.io.Writable;
@@ -37,9 +38,12 @@ import org.apache.hcatalog.common.HCatCo
 import org.apache.hcatalog.common.HCatUtil;
 import org.apache.hcatalog.mapreduce.OutputJobInfo;
 
-public class HBaseBaseOutputFormat implements OutputFormat<WritableComparable<?>, Put>,
-  HiveOutputFormat<WritableComparable<?>, Put> {
-
+/**
+ * Children of this output format can be passed either Put's (typical) or
+ * PutWritable. PutWritables will come from Hive's HBase SerDe.
+ */
+public class HBaseBaseOutputFormat implements OutputFormat<WritableComparable<?>, Object>,
+  HiveOutputFormat<WritableComparable<?>, Object> {
   @Override
   public FSRecordWriter getHiveRecordWriter(
     JobConf jc, Path finalOutPath,
@@ -51,22 +55,34 @@ public class HBaseBaseOutputFormat imple
 
   @Override
   public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
-    OutputFormat<WritableComparable<?>, Put> outputFormat = getOutputFormat(job);
+    OutputFormat<WritableComparable<?>, Object> outputFormat = getOutputFormat(job);
     outputFormat.checkOutputSpecs(ignored, job);
   }
 
   @Override
-  public RecordWriter<WritableComparable<?>, Put> getRecordWriter(FileSystem ignored,
+  public RecordWriter<WritableComparable<?>, Object> getRecordWriter(FileSystem ignored,
                                   JobConf job, String name, Progressable progress) throws IOException {
-    OutputFormat<WritableComparable<?>, Put> outputFormat = getOutputFormat(job);
+    HBaseHCatStorageHandler.setHBaseSerializers(job);
+    OutputFormat<WritableComparable<?>, Object> outputFormat = getOutputFormat(job);
     return outputFormat.getRecordWriter(ignored, job, name, progress);
   }
 
-  private OutputFormat<WritableComparable<?>, Put> getOutputFormat(JobConf job)
+  protected static Put toPut(Object o) {
+    if(o != null) {
+      if(o instanceof Put) {
+        return (Put)o;
+      } else if(o instanceof PutWritable) {
+        return ((PutWritable)o).getPut();
+      }
+    }
+    throw new IllegalArgumentException("Illegal Argument " + (o == null ? "null" : o.getClass().getName()));
+  }
+
+  private OutputFormat<WritableComparable<?>, Object> getOutputFormat(JobConf job)
     throws IOException {
     String outputInfo = job.get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
     OutputJobInfo outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(outputInfo);
-    OutputFormat<WritableComparable<?>, Put> outputFormat = null;
+    OutputFormat<WritableComparable<?>, Object> outputFormat = null;
     if (HBaseHCatStorageHandler.isBulkMode(outputJobInfo)) {
       outputFormat = new HBaseBulkOutputFormat();
     } else {

Modified: hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java?rev=1541609&r1=1541608&r2=1541609&view=diff
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java (original)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java Wed Nov 13 16:47:11 2013
@@ -25,6 +25,7 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -53,10 +54,10 @@ class HBaseBulkOutputFormat extends HBas
 
   private final static ImmutableBytesWritable EMPTY_LIST = new ImmutableBytesWritable(
     new byte[0]);
-  private SequenceFileOutputFormat<WritableComparable<?>, Put> baseOutputFormat;
+  private SequenceFileOutputFormat<WritableComparable<?>, Object> baseOutputFormat;
 
   public HBaseBulkOutputFormat() {
-    baseOutputFormat = new SequenceFileOutputFormat<WritableComparable<?>, Put>();
+    baseOutputFormat = new SequenceFileOutputFormat<WritableComparable<?>, Object>();
   }
 
   @Override
@@ -68,9 +69,10 @@ class HBaseBulkOutputFormat extends HBas
   }
 
   @Override
-  public RecordWriter<WritableComparable<?>, Put> getRecordWriter(
+  public RecordWriter<WritableComparable<?>, Object> getRecordWriter(
     FileSystem ignored, JobConf job, String name, Progressable progress)
     throws IOException {
+    HBaseHCatStorageHandler.setHBaseSerializers(job);
     job.setOutputKeyClass(ImmutableBytesWritable.class);
     job.setOutputValueClass(Put.class);
     long version = HBaseRevisionManagerUtil.getOutputRevision(job);
@@ -93,26 +95,28 @@ class HBaseBulkOutputFormat extends HBas
   }
 
   private static class HBaseBulkRecordWriter implements
-    RecordWriter<WritableComparable<?>, Put> {
+    RecordWriter<WritableComparable<?>, Object> {
 
-    private RecordWriter<WritableComparable<?>, Put> baseWriter;
+    private RecordWriter<WritableComparable<?>, Object> baseWriter;
     private final Long outputVersion;
 
     public HBaseBulkRecordWriter(
-      RecordWriter<WritableComparable<?>, Put> baseWriter,
+      RecordWriter<WritableComparable<?>, Object> baseWriter,
       Long outputVersion) {
       this.baseWriter = baseWriter;
       this.outputVersion = outputVersion;
     }
 
     @Override
-    public void write(WritableComparable<?> key, Put value)
+    public void write(WritableComparable<?> key, Object value)
       throws IOException {
-      Put put = value;
+      Put original = toPut(value);
+      Put put = original;
       if (outputVersion != null) {
-        put = new Put(value.getRow(), outputVersion.longValue());
-        for (List<KeyValue> row : value.getFamilyMap().values()) {
-          for (KeyValue el : row) {
+        put = new Put(original.getRow(), outputVersion.longValue());
+        for (List<? extends Cell> row : original.getFamilyMap().values()) {
+          for (Cell cell : row) {
+            KeyValue el = (KeyValue)cell;
             put.add(el.getFamily(), el.getQualifier(), el.getValue());
           }
         }

Modified: hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java?rev=1541609&r1=1541608&r2=1541609&view=diff
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java (original)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java Wed Nov 13 16:47:11 2013
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.mapred.TableOutputFormat;
@@ -53,9 +54,10 @@ class HBaseDirectOutputFormat extends HB
   }
 
   @Override
-  public RecordWriter<WritableComparable<?>, Put> getRecordWriter(FileSystem ignored,
+  public RecordWriter<WritableComparable<?>, Object> getRecordWriter(FileSystem ignored,
                                   JobConf job, String name, Progressable progress)
     throws IOException {
+    HBaseHCatStorageHandler.setHBaseSerializers(job);
     long version = HBaseRevisionManagerUtil.getOutputRevision(job);
     return new HBaseDirectRecordWriter(outputFormat.getRecordWriter(ignored, job, name,
       progress), version);
@@ -69,26 +71,28 @@ class HBaseDirectOutputFormat extends HB
   }
 
   private static class HBaseDirectRecordWriter implements
-    RecordWriter<WritableComparable<?>, Put> {
+    RecordWriter<WritableComparable<?>, Object> {
 
-    private RecordWriter<WritableComparable<?>, Put> baseWriter;
+    private RecordWriter<WritableComparable<?>, Object> baseWriter;
     private final Long outputVersion;
 
     public HBaseDirectRecordWriter(
-      RecordWriter<WritableComparable<?>, Put> baseWriter,
+      RecordWriter<WritableComparable<?>, Object> baseWriter,
       Long outputVersion) {
       this.baseWriter = baseWriter;
       this.outputVersion = outputVersion;
     }
 
     @Override
-    public void write(WritableComparable<?> key, Put value)
+    public void write(WritableComparable<?> key, Object value)
       throws IOException {
-      Put put = value;
+      Put original = toPut(value);
+      Put put = original;
       if (outputVersion != null) {
-        put = new Put(value.getRow(), outputVersion.longValue());
-        for (List<KeyValue> row : value.getFamilyMap().values()) {
-          for (KeyValue el : row) {
+        put = new Put(original.getRow(), outputVersion.longValue());
+        for (List<? extends Cell> row : original.getFamilyMap().values()) {
+          for (Cell cell : row) {
+            KeyValue el = (KeyValue)cell;
             put.add(el.getFamily(), el.getQualifier(), el.getValue());
           }
         }

Modified: hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java?rev=1541609&r1=1541608&r2=1541609&view=diff
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java (original)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java Wed Nov 13 16:47:11 2013
@@ -33,11 +33,12 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MasterNotRunningException;
-import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.mapred.TableOutputFormat;
+import org.apache.hadoop.hbase.mapreduce.KeyValueSerialization;
+import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
+import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
 import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -50,6 +51,7 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
 import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.io.serializer.WritableSerialization;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
@@ -87,6 +89,7 @@ public class HBaseHCatStorageHandler ext
 
   public final static String DEFAULT_PREFIX = "default.";
   private final static String PROPERTY_INT_OUTPUT_LOCATION = "hcat.hbase.mapreduce.intermediateOutputLocation";
+  private final static String IO_SERIALIZATIONS = "io.serializations";
 
   private Configuration hbaseConf;
   private Configuration jobConf;
@@ -135,7 +138,7 @@ public class HBaseHCatStorageHandler ext
       //TODO: Remove when HCAT-308 is fixed
       addOutputDependencyJars(jobConf);
       jobProperties.put("tmpjars", jobConf.get("tmpjars"));
-
+      setHBaseSerializers(jobProperties);
     } catch (IOException e) {
       throw new IllegalStateException("Error while configuring job properties", e);
     }
@@ -191,7 +194,7 @@ public class HBaseHCatStorageHandler ext
       jobProperties.put(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo));
       addOutputDependencyJars(jobConf);
       jobProperties.put("tmpjars", jobConf.get("tmpjars"));
-
+      setHBaseSerializers(jobProperties);
     } catch (IOException e) {
       throw new IllegalStateException("Error while configuring job properties", e);
     }
@@ -339,12 +342,8 @@ public class HBaseHCatStorageHandler ext
       RevisionManager rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(hbaseConf);
       rm.createTable(tableName, new ArrayList<String>(uniqueColumnFamilies));
 
-    } catch (MasterNotRunningException mnre) {
-      throw new MetaException(StringUtils.stringifyException(mnre));
-    } catch (IOException ie) {
-      throw new MetaException(StringUtils.stringifyException(ie));
-    } catch (IllegalArgumentException iae) {
-      throw new MetaException(StringUtils.stringifyException(iae));
+    } catch (Exception e) {
+      throw new MetaException(StringUtils.stringifyException(e));
     }
 
   }
@@ -406,10 +405,8 @@ public class HBaseHCatStorageHandler ext
         admin = new HBaseAdmin(this.getConf());
       }
       return admin;
-    } catch (MasterNotRunningException mnre) {
-      throw new MetaException(StringUtils.stringifyException(mnre));
-    } catch (ZooKeeperConnectionException zkce) {
-      throw new MetaException(StringUtils.stringifyException(zkce));
+    } catch (Exception e) {
+      throw new MetaException(StringUtils.stringifyException(e));
     }
   }
 
@@ -544,8 +541,10 @@ public class HBaseHCatStorageHandler ext
     TableMapReduceUtil.addDependencyJars(conf,
       //ZK
       ZooKeeper.class,
-      //HBase
+      //HBase Client
       HTable.class,
+      //HBase MapReduce
+      MutationSerialization.class,
       //Hive
       HiveException.class,
       //HCatalog jar
@@ -634,4 +633,20 @@ public class HBaseHCatStorageHandler ext
     return builder.toString();
   }
 
+  static void setHBaseSerializers(Configuration configuration) {
+    configuration.setStrings(IO_SERIALIZATIONS, configuration.get(IO_SERIALIZATIONS),
+        MutationSerialization.class.getName(), ResultSerialization.class.getName(),
+        KeyValueSerialization.class.getName());
+  }
+  static void setHBaseSerializers(Map<String, String> configuration) {
+    String value = "";
+    if(configuration.containsKey(IO_SERIALIZATIONS)) {
+      value = configuration.get(IO_SERIALIZATIONS) + ",";
+    } else {
+      value = new Configuration().get(IO_SERIALIZATIONS, WritableSerialization.class.getName()) + ",";
+    }
+    value += MutationSerialization.class.getName() + "," + ResultSerialization.class.getName()
+        + "," + KeyValueSerialization.class.getName();
+    configuration.put(IO_SERIALIZATIONS, value);
+  }
 }

Modified: hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java?rev=1541609&r1=1541608&r2=1541609&view=diff
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java (original)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java Wed Nov 13 16:47:11 2013
@@ -23,11 +23,11 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapred.TableSplit;
 import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hive.hbase.ResultWritable;
 import org.apache.hive.hcatalog.mapreduce.HCatMapRedUtil;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
@@ -41,7 +41,7 @@ import org.apache.hcatalog.mapreduce.Inp
 /**
  * This class HBaseInputFormat is a wrapper class of TableInputFormat in HBase.
  */
-class HBaseInputFormat implements InputFormat<ImmutableBytesWritable, Result> {
+class HBaseInputFormat implements InputFormat<ImmutableBytesWritable, ResultWritable> {
 
   private final TableInputFormat inputFormat;
 
@@ -66,7 +66,7 @@ class HBaseInputFormat implements InputF
    * org.apache.hadoop.mapreduce.TaskAttemptContext)
    */
   @Override
-  public RecordReader<ImmutableBytesWritable, Result> getRecordReader(
+  public RecordReader<ImmutableBytesWritable, ResultWritable> getRecordReader(
     InputSplit split, JobConf job, Reporter reporter)
     throws IOException {
     String jobString = job.get(HCatConstants.HCAT_KEY_JOB_INFO);

Modified: hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java?rev=1541609&r1=1541608&r2=1541609&view=diff
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java (original)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java Wed Nov 13 16:47:11 2013
@@ -35,8 +35,7 @@ import org.apache.hadoop.hbase.client.Re
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.hive.hbase.ResultWritable;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hcatalog.common.HCatUtil;
 import org.apache.hcatalog.hbase.snapshot.FamilyRevision;
@@ -50,7 +49,7 @@ import org.slf4j.LoggerFactory;
  * The Class HbaseSnapshotRecordReader implements logic for filtering records
  * based on snapshot.
  */
-class HbaseSnapshotRecordReader implements RecordReader<ImmutableBytesWritable, Result> {
+class HbaseSnapshotRecordReader implements RecordReader<ImmutableBytesWritable, ResultWritable> {
 
   static final Logger LOG = LoggerFactory.getLogger(HbaseSnapshotRecordReader.class);
   private final InputJobInfo inpJobInfo;
@@ -62,8 +61,6 @@ class HbaseSnapshotRecordReader implemen
   private TableSnapshot snapshot;
   private Iterator<Result> resultItr;
   private Set<Long> allAbortedTransactions;
-  private DataOutputBuffer valueOut = new DataOutputBuffer();
-  private DataInputBuffer valueIn = new DataInputBuffer();
 
   HbaseSnapshotRecordReader(InputJobInfo inputJobInfo, Configuration conf) throws IOException {
     this.inpJobInfo = inputJobInfo;
@@ -152,8 +149,8 @@ class HbaseSnapshotRecordReader implemen
   }
 
   @Override
-  public Result createValue() {
-    return new Result();
+  public ResultWritable createValue() {
+    return new ResultWritable();
   }
 
   @Override
@@ -170,7 +167,7 @@ class HbaseSnapshotRecordReader implemen
   }
 
   @Override
-  public boolean next(ImmutableBytesWritable key, Result value) throws IOException {
+  public boolean next(ImmutableBytesWritable key, ResultWritable value) throws IOException {
     if (this.resultItr == null) {
       LOG.warn("The HBase result iterator is found null. It is possible"
         + " that the record reader has already been closed.");
@@ -182,10 +179,7 @@ class HbaseSnapshotRecordReader implemen
           // Update key and value. Currently no way to avoid serialization/de-serialization
           // as no setters are available.
           key.set(hbaseRow.getRow());
-          valueOut.reset();
-          hbaseRow.write(valueOut);
-          valueIn.reset(valueOut.getData(), valueOut.getLength());
-          value.readFields(valueIn);
+          value.setResult(hbaseRow);
           return true;
         }
 

Modified: hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/ImportSequenceFile.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/ImportSequenceFile.java?rev=1541609&r1=1541608&r2=1541609&view=diff
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/ImportSequenceFile.java (original)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/ImportSequenceFile.java Wed Nov 13 16:47:11 2013
@@ -19,23 +19,22 @@
 
 package org.apache.hcatalog.hbase;
 
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
-import org.apache.hadoop.hbase.mapreduce.PutSortReducer;
-import org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner;
-
 import java.io.IOException;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.mapreduce.PutSortReducer;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobStatus;
@@ -47,11 +46,10 @@ import org.apache.hadoop.mapreduce.lib.i
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hive.hcatalog.mapreduce.HCatMapRedUtil;
+import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner.DEFAULT_PATH;
-
 
 /**
  * MapReduce job which reads a series of Puts stored in a sequence file
@@ -71,12 +69,8 @@ class ImportSequenceFile {
     @Override
     public void map(ImmutableBytesWritable rowKey, Put value,
             Context context)
-      throws IOException {
-      try {
-        context.write(new ImmutableBytesWritable(value.getRow()), value);
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
+      throws IOException, InterruptedException {
+      context.write(new ImmutableBytesWritable(value.getRow()), value);
     }
   }
 
@@ -158,6 +152,7 @@ class ImportSequenceFile {
 
   private static Job createSubmittableJob(Configuration conf, String tableName, Path inputDir, Path scratchDir, boolean localMode)
     throws IOException {
+    HBaseHCatStorageHandler.setHBaseSerializers(conf);
     Job job = new Job(conf, NAME + "_" + tableName);
     job.setJarByClass(SequenceFileImporter.class);
     FileInputFormat.setInputPaths(job, inputDir);
@@ -170,18 +165,35 @@ class ImportSequenceFile {
     job.setMapOutputKeyClass(ImmutableBytesWritable.class);
     job.setMapOutputValueClass(Put.class);
     HFileOutputFormat.configureIncrementalLoad(job, table);
+    URI partitionURI;
+    try {
+      partitionURI = new URI(TotalOrderPartitioner.getPartitionFile(job.getConfiguration())
+          + "#" + TotalOrderPartitioner.DEFAULT_PATH);
+    } catch (URISyntaxException e) {
+      throw new IOException(e);
+    }
+    DistributedCache.addCacheFile(partitionURI, job.getConfiguration());
+    DistributedCache.createSymlink(job.getConfiguration());
     //override OutputFormatClass with our own so we can include cleanup in the committer
     job.setOutputFormatClass(ImporterOutputFormat.class);
 
     //local mode doesn't support symbolic links so we have to manually set the actual path
     if (localMode) {
       String partitionFile = null;
-      for (URI uri : DistributedCache.getCacheFiles(job.getConfiguration())) {
-        if (DEFAULT_PATH.equals(uri.getFragment())) {
+      URI[] uris = DistributedCache.getCacheFiles(job.getConfiguration());
+      if(uris == null) {
+        throw new IllegalStateException("No cache file existed in job configuration");
+      }
+      for (URI uri : uris) {
+        if (TotalOrderPartitioner.DEFAULT_PATH.equals(uri.getFragment())) {
           partitionFile = uri.toString();
           break;
         }
       }
+      if(partitionFile == null) {
+        throw new IllegalStateException("Unable to find " +
+            TotalOrderPartitioner.DEFAULT_PATH + " in cache");
+      }
       partitionFile = partitionFile.substring(0, partitionFile.lastIndexOf("#"));
       job.getConfiguration().set(TotalOrderPartitioner.PARTITIONER_PATH, partitionFile.toString());
     }

Added: hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RPCConverter.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RPCConverter.java?rev=1541609&view=auto
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RPCConverter.java (added)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RPCConverter.java Wed Nov 13 16:47:11 2013
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class RPCConverter {
+
+  List<FamilyRevision> convertFamilyRevisions(List<RevisionManagerEndpointProtos.FamilyRevision> revisions) {
+    List<FamilyRevision> result = new ArrayList<FamilyRevision>();
+    for(RevisionManagerEndpointProtos.FamilyRevision revision : revisions) {
+      result.add(new FamilyRevision(revision.getRevision(), revision.getTimestamp()));
+    }
+    return result;
+  }
+  RevisionManagerEndpointProtos.TableSnapshot convertTableSnapshot(TableSnapshot tableSnapshot) {
+    RevisionManagerEndpointProtos.TableSnapshot.Builder builder =
+        RevisionManagerEndpointProtos.TableSnapshot.newBuilder()
+        .setTableName(tableSnapshot.getTableName())
+        .setLatestRevision(tableSnapshot.getLatestRevision());
+    Map<String, Long> cfRevisionMap = tableSnapshot.getColumnFamilyRevisionMap();
+    for(Map.Entry<String, Long> entry : cfRevisionMap.entrySet()) {
+      builder.addColumnFamilyRevision(RevisionManagerEndpointProtos.TableSnapshot.
+          ColumnFamilyRevision.newBuilder()
+          .setKey(entry.getKey())
+          .setValue(entry.getValue()).build());
+    }
+    return builder.build();
+  }
+
+  TableSnapshot convertTableSnapshot(RevisionManagerEndpointProtos.TableSnapshot tableSnapshot) {
+    Map<String, Long> columnFamilyRevisions = new HashMap<String, Long>();
+    for(RevisionManagerEndpointProtos.TableSnapshot.ColumnFamilyRevision rev : tableSnapshot.getColumnFamilyRevisionList()) {
+      columnFamilyRevisions.put(rev.getKey(), rev.getValue());
+    }
+    return new TableSnapshot(tableSnapshot.getTableName(), columnFamilyRevisions, tableSnapshot.getLatestRevision());
+  }
+
+  RevisionManagerEndpointProtos.Transaction convertTransaction(Transaction transaction) {
+    return RevisionManagerEndpointProtos.Transaction.newBuilder()
+      .setTableName(transaction.getTableName())
+      .addAllColumnFamilies(transaction.getColumnFamilies())
+      .setRevision(transaction.getRevisionNumber())
+      .setTimeStamp(transaction.getTimeStamp())
+      .setKeepAlive(transaction.getKeepAliveValue())
+      .build();
+  }
+  Transaction convertTransaction(RevisionManagerEndpointProtos.Transaction transaction) {
+    Transaction result = new Transaction(transaction.getTableName(), transaction.getColumnFamiliesList(), transaction.getRevision(), transaction.getTimeStamp());
+    result.setKeepAlive(transaction.getKeepAlive());
+    return result;
+  }
+}

Modified: hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java?rev=1541609&r1=1541608&r2=1541609&view=diff
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java (original)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java Wed Nov 13 16:47:11 2013
@@ -93,7 +93,7 @@ public interface RevisionManager {
    */
   @Deprecated
   public Transaction beginWriteTransaction(String table,
-                       List<String> families, long keepAlive) throws IOException;
+                       List<String> families, Long keepAlive) throws IOException;
 
   /**
    * Commit the write transaction.
@@ -146,7 +146,7 @@ public interface RevisionManager {
    * @throws IOException
    */
   @Deprecated
-  public TableSnapshot createSnapshot(String tableName, long revision)
+  public TableSnapshot createSnapshot(String tableName, Long revision)
     throws IOException;
 
   /**

Modified: hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpoint.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpoint.java?rev=1541609&r1=1541608&r2=1541609&view=diff
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpoint.java (original)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpoint.java Wed Nov 13 16:47:11 2013
@@ -19,14 +19,36 @@
 package org.apache.hcatalog.hbase.snapshot;
 
 import java.io.IOException;
-import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
-import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
+import org.apache.hadoop.hbase.protobuf.ResponseConverter;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.AbortWriteTransactionRequest;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.AbortWriteTransactionResponse;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.BeginWriteTransactionRequest;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.BeginWriteTransactionResponse;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.CommitWriteTransactionRequest;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.CommitWriteTransactionResponse;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.CreateSnapshotRequest;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.CreateSnapshotResponse;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.CreateTableRequest;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.CreateTableResponse;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.DropTableRequest;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.DropTableResponse;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.GetAbortedWriteTransactionsRequest;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.GetAbortedWriteTransactionsResponse;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.KeepAliveTransactionRequest;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.KeepAliveTransactionResponse;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.RevisionManagerEndpointService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+
 /**
  * Implementation of RevisionManager as HBase RPC endpoint. This class will control the lifecycle of
  * and delegate to the actual RevisionManager implementation and make it available as a service
@@ -34,21 +56,21 @@ import org.slf4j.LoggerFactory;
  * In the case of {@link ZKBasedRevisionManager} now only the region servers need write access to
  * manage revision data.
  */
-public class RevisionManagerEndpoint extends BaseEndpointCoprocessor implements RevisionManagerProtocol {
+public class RevisionManagerEndpoint extends RevisionManagerEndpointService implements Coprocessor, CoprocessorService {
 
   private static final Logger LOGGER =
     LoggerFactory.getLogger(RevisionManagerEndpoint.class.getName());
 
+  private final RPCConverter rpcConverter = new RPCConverter();
   private RevisionManager rmImpl = null;
 
   @Override
   public void start(CoprocessorEnvironment env) {
-    super.start(env);
     try {
       Configuration conf = RevisionManagerConfiguration.create(env.getConfiguration());
       String className = conf.get(RMConstants.REVISION_MGR_ENDPOINT_IMPL_CLASS,
         ZKBasedRevisionManager.class.getName());
-      LOGGER.debug("Using Revision Manager implementation: {}", className);
+      LOGGER.info("Using Revision Manager implementation: {}", className);
       rmImpl = RevisionManagerFactory.getOpenedRevisionManager(className, conf);
     } catch (IOException e) {
       LOGGER.error("Failed to initialize revision manager", e);
@@ -57,85 +79,140 @@ public class RevisionManagerEndpoint ext
 
   @Override
   public void stop(CoprocessorEnvironment env) {
-    if (rmImpl != null) {
-      try {
+    try {
+      if (rmImpl != null) {
         rmImpl.close();
-      } catch (IOException e) {
-        LOGGER.warn("Error closing revision manager.", e);
       }
+    } catch (IOException e) {
+      LOGGER.warn("Error closing revision manager.", e);
     }
-    super.stop(env);
-  }
-
-  @Override
-  public void initialize(Configuration conf) {
-    // do nothing, HBase controls life cycle
-  }
-
-  @Override
-  public void open() throws IOException {
-    // do nothing, HBase controls life cycle
   }
 
   @Override
-  public void close() throws IOException {
-    // do nothing, HBase controls life cycle
+  public Service getService() {
+    return this;
   }
 
   @Override
-  public void createTable(String table, List<String> columnFamilies) throws IOException {
-    rmImpl.createTable(table, columnFamilies);
-  }
-
-  @Override
-  public void dropTable(String table) throws IOException {
-    rmImpl.dropTable(table);
-  }
-
-  @Override
-  public Transaction beginWriteTransaction(String table, List<String> families)
-    throws IOException {
-    return rmImpl.beginWriteTransaction(table, families);
+  public void createTable(RpcController controller,
+      CreateTableRequest request, RpcCallback<CreateTableResponse> done) {
+    if(rmImpl != null) {
+      try {
+        rmImpl.createTable(request.getTableName(), request.getColumnFamiliesList());
+        done.run(CreateTableResponse.newBuilder().build());
+      } catch(IOException e) {
+        ResponseConverter.setControllerException(controller, e);
+      }
+    }
   }
 
   @Override
-  public Transaction beginWriteTransaction(String table,
-                       List<String> families, long keepAlive) throws IOException {
-    return rmImpl.beginWriteTransaction(table, families, keepAlive);
+  public void dropTable(RpcController controller, DropTableRequest request,
+      RpcCallback<DropTableResponse> done) {
+    if(rmImpl != null) {
+      try {
+        rmImpl.dropTable(request.getTableName());
+        done.run(DropTableResponse.newBuilder().build());
+      } catch(IOException e) {
+        ResponseConverter.setControllerException(controller, e);
+      }
+    }
   }
 
   @Override
-  public void commitWriteTransaction(Transaction transaction)
-    throws IOException {
-    rmImpl.commitWriteTransaction(transaction);
+  public void beginWriteTransaction(RpcController controller,
+      BeginWriteTransactionRequest request,
+      RpcCallback<BeginWriteTransactionResponse> done) {
+    if(rmImpl != null) {
+      try {
+        Transaction transaction;
+        if(request.hasKeepAlive()) {
+          transaction = rmImpl.beginWriteTransaction(request.getTableName(), request.getColumnFamiliesList(),
+              request.getKeepAlive());
+        } else {
+          transaction = rmImpl.beginWriteTransaction(request.getTableName(), request.getColumnFamiliesList());
+        }
+        done.run(BeginWriteTransactionResponse.newBuilder()
+                .setTransaction(rpcConverter.convertTransaction(transaction)).build());
+      } catch(IOException e) {
+        ResponseConverter.setControllerException(controller, e);
+      }
+    }
   }
 
   @Override
-  public void abortWriteTransaction(Transaction transaction)
-    throws IOException {
-    rmImpl.abortWriteTransaction(transaction);
+  public void commitWriteTransaction(RpcController controller,
+      CommitWriteTransactionRequest request,
+      RpcCallback<CommitWriteTransactionResponse> done) {
+    if(rmImpl != null) {
+      try {
+        rmImpl.commitWriteTransaction(rpcConverter.convertTransaction(request.getTransaction()));
+        done.run(CommitWriteTransactionResponse.newBuilder().build());
+      } catch(IOException e) {
+        ResponseConverter.setControllerException(controller, e);
+      }
+    }
   }
 
   @Override
-  public TableSnapshot createSnapshot(String tableName) throws IOException {
-    return rmImpl.createSnapshot(tableName);
+  public void abortWriteTransaction(RpcController controller,
+      AbortWriteTransactionRequest request,
+      RpcCallback<AbortWriteTransactionResponse> done) {
+    if(rmImpl != null) {
+      try {
+        rmImpl.abortWriteTransaction(rpcConverter.convertTransaction(request.getTransaction()));
+        done.run(AbortWriteTransactionResponse.newBuilder().build());
+      } catch(IOException e) {
+        ResponseConverter.setControllerException(controller, e);
+      }
+    }
   }
 
   @Override
-  public TableSnapshot createSnapshot(String tableName, long revision)
-    throws IOException {
-    return rmImpl.createSnapshot(tableName, revision);
+  public void getAbortedWriteTransactions(RpcController controller,
+      GetAbortedWriteTransactionsRequest request,
+      RpcCallback<GetAbortedWriteTransactionsResponse> done) {
+    if(rmImpl != null) {
+      try {
+        rmImpl.getAbortedWriteTransactions(request.getTableName(), request.getColumnFamily());
+        done.run(GetAbortedWriteTransactionsResponse.newBuilder().build());
+      } catch(IOException e) {
+        ResponseConverter.setControllerException(controller, e);
+      }
+    }
   }
 
   @Override
-  public void keepAlive(Transaction transaction) throws IOException {
-    rmImpl.keepAlive(transaction);
+  public void createSnapshot(RpcController controller,
+      CreateSnapshotRequest request, RpcCallback<CreateSnapshotResponse> done) {
+    if(rmImpl != null) {
+      try {
+        TableSnapshot snapshot;
+        if(request.hasRevision()) {
+          snapshot = rmImpl.createSnapshot(request.getTableName(), request.getRevision());
+        } else {
+          snapshot = rmImpl.createSnapshot(request.getTableName());
+        }
+        done.run(CreateSnapshotResponse.newBuilder()
+                .setTableSnapshot(rpcConverter.convertTableSnapshot(snapshot)).build());
+      } catch(IOException e) {
+        ResponseConverter.setControllerException(controller, e);
+      }
+    }
   }
 
   @Override
-  public List<FamilyRevision> getAbortedWriteTransactions(String table,
-                              String columnFamily) throws IOException {
-    return rmImpl.getAbortedWriteTransactions(table, columnFamily);
+  public void keepAliveTransaction(RpcController controller,
+      KeepAliveTransactionRequest request,
+      RpcCallback<KeepAliveTransactionResponse> done) {
+    if(rmImpl != null) {
+      try {
+        rmImpl.keepAlive(rpcConverter.convertTransaction(request.getTransaction()));
+        done.run(KeepAliveTransactionResponse.newBuilder().build());
+      } catch(IOException e) {
+        ResponseConverter.setControllerException(controller, e);
+      }
+    }
   }
 
 }

Modified: hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpointClient.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpointClient.java?rev=1541609&r1=1541608&r2=1541609&view=diff
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpointClient.java (original)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpointClient.java Wed Nov 13 16:47:11 2013
@@ -21,12 +21,33 @@ package org.apache.hcatalog.hbase.snapsh
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.AbortWriteTransactionRequest;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.AbortWriteTransactionResponse;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.BeginWriteTransactionRequest;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.BeginWriteTransactionResponse;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.CommitWriteTransactionRequest;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.CommitWriteTransactionResponse;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.CreateSnapshotRequest;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.CreateSnapshotResponse;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.CreateTableRequest;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.CreateTableResponse;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.DropTableRequest;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.DropTableResponse;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.GetAbortedWriteTransactionsRequest;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.GetAbortedWriteTransactionsResponse;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.KeepAliveTransactionRequest;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.KeepAliveTransactionResponse;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.RevisionManagerEndpointService;
 
 /**
  * This class is nothing but a delegate for the enclosed proxy,
@@ -34,17 +55,18 @@ import org.apache.hadoop.hbase.util.Byte
  */
 public class RevisionManagerEndpointClient implements RevisionManager, Configurable {
 
-  private Configuration conf = null;
-  private RevisionManager rmProxy;
+  private final RPCConverter rpcConverter = new RPCConverter();
+  private Configuration conf;
+  private HTable htable;
 
   @Override
   public Configuration getConf() {
-    return this.conf;
+    return conf;
   }
 
   @Override
-  public void setConf(Configuration arg0) {
-    this.conf = arg0;
+  public void setConf(Configuration conf) {
+    this.conf = conf;
   }
 
   @Override
@@ -56,70 +78,207 @@ public class RevisionManagerEndpointClie
   public void open() throws IOException {
     // clone to adjust RPC settings unique to proxy
     Configuration clonedConf = new Configuration(conf);
-    // conf.set("hbase.ipc.client.connect.max.retries", "0");
-    // conf.setInt(HConstants.HBASE_CLIENT_RPC_MAXATTEMPTS, 1);
     clonedConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); // do not retry RPC
-    HTable table = new HTable(clonedConf, HConstants.ROOT_TABLE_NAME);
-    rmProxy = table.coprocessorProxy(RevisionManagerProtocol.class,
-      Bytes.toBytes("anyRow"));
-    rmProxy.open();
+    htable = new HTable(clonedConf, TableName.META_TABLE_NAME.getNameAsString());
   }
 
   @Override
   public void close() throws IOException {
-    rmProxy.close();
+    htable.close();
   }
 
   @Override
-  public void createTable(String table, List<String> columnFamilies) throws IOException {
-    rmProxy.createTable(table, columnFamilies);
+  public void createTable(final String table, final List<String> columnFamilies) throws IOException {
+    call(new Batch.Call<RevisionManagerEndpointService, Void>() {
+      @Override
+      public Void call(RevisionManagerEndpointService service)
+        throws IOException {
+        ServerRpcController controller = new ServerRpcController();
+        BlockingRpcCallback<CreateTableResponse> done =
+            new BlockingRpcCallback<CreateTableResponse>();
+        CreateTableRequest request = CreateTableRequest.newBuilder()
+                .setTableName(table).addAllColumnFamilies(columnFamilies).build();
+        service.createTable(controller, request, done);
+        blockOnResponse(done, controller);
+        return null;
+      }
+    });
   }
 
   @Override
-  public void dropTable(String table) throws IOException {
-    rmProxy.dropTable(table);
+  public void dropTable(final String table) throws IOException {
+    call(new Batch.Call<RevisionManagerEndpointService, Void>() {
+      @Override
+      public Void call(RevisionManagerEndpointService service)
+        throws IOException {
+        ServerRpcController controller = new ServerRpcController();
+        BlockingRpcCallback<DropTableResponse> done =
+            new BlockingRpcCallback<DropTableResponse>();
+        DropTableRequest request = DropTableRequest.newBuilder()
+              .setTableName(table).build();
+        service.dropTable(null, request, done);
+        blockOnResponse(done, controller);
+        return null;
+      }
+    });
   }
 
   @Override
-  public Transaction beginWriteTransaction(String table, List<String> families) throws IOException {
-    return rmProxy.beginWriteTransaction(table, families);
+  public Transaction beginWriteTransaction(final String table, final List<String> families) throws IOException {
+    return beginWriteTransaction(table, families, null);
   }
 
   @Override
-  public Transaction beginWriteTransaction(String table, List<String> families, long keepAlive)
+  public Transaction beginWriteTransaction(final String table, final List<String> families, final Long keepAlive)
     throws IOException {
-    return rmProxy.beginWriteTransaction(table, families, keepAlive);
+    return call(new Batch.Call<RevisionManagerEndpointService, Transaction>() {
+      @Override
+      public Transaction call(RevisionManagerEndpointService service)
+        throws IOException {
+        ServerRpcController controller = new ServerRpcController();
+        BlockingRpcCallback<BeginWriteTransactionResponse> done =
+            new BlockingRpcCallback<BeginWriteTransactionResponse>();
+        BeginWriteTransactionRequest.Builder builder = BeginWriteTransactionRequest.newBuilder()
+              .setTableName(table)
+              .addAllColumnFamilies(families);
+        if(keepAlive != null) {
+          builder.setKeepAlive(keepAlive);
+        }
+        service.beginWriteTransaction(controller, builder.build(), done);
+        return rpcConverter.convertTransaction(blockOnResponse(done, controller).getTransaction());
+      }
+    });
   }
 
   @Override
-  public void commitWriteTransaction(Transaction transaction) throws IOException {
-    rmProxy.commitWriteTransaction(transaction);
+  public void commitWriteTransaction(final Transaction transaction) throws IOException {
+    call(new Batch.Call<RevisionManagerEndpointService, Void>() {
+      @Override
+      public Void call(RevisionManagerEndpointService service)
+        throws IOException {
+        ServerRpcController controller = new ServerRpcController();
+        BlockingRpcCallback<CommitWriteTransactionResponse> done =
+            new BlockingRpcCallback<CommitWriteTransactionResponse>();
+        CommitWriteTransactionRequest request = CommitWriteTransactionRequest.newBuilder()
+              .setTransaction(rpcConverter.convertTransaction(transaction)).build();
+        service.commitWriteTransaction(controller, request, done);
+        blockOnResponse(done, controller);
+        return null;
+      }
+    });
   }
 
   @Override
-  public void abortWriteTransaction(Transaction transaction) throws IOException {
-    rmProxy.abortWriteTransaction(transaction);
+  public void abortWriteTransaction(final Transaction transaction) throws IOException {
+    call(new Batch.Call<RevisionManagerEndpointService, Void>() {
+      @Override
+      public Void call(RevisionManagerEndpointService service)
+        throws IOException {
+        ServerRpcController controller = new ServerRpcController();
+        BlockingRpcCallback<AbortWriteTransactionResponse> done =
+            new BlockingRpcCallback<AbortWriteTransactionResponse>();
+        AbortWriteTransactionRequest request = AbortWriteTransactionRequest.newBuilder()
+              .setTransaction(rpcConverter.convertTransaction(transaction)).build();
+        service.abortWriteTransaction(controller, request, done);
+        blockOnResponse(done, controller);
+        return null;
+      }
+    });
   }
 
   @Override
-  public List<FamilyRevision> getAbortedWriteTransactions(String table, String columnFamily)
+  public List<FamilyRevision> getAbortedWriteTransactions(final String table, final String columnFamily)
     throws IOException {
-    return rmProxy.getAbortedWriteTransactions(table, columnFamily);
+    return call(new Batch.Call<RevisionManagerEndpointService, List<FamilyRevision>>() {
+      @Override
+      public List<FamilyRevision> call(RevisionManagerEndpointService service)
+        throws IOException {
+        ServerRpcController controller = new ServerRpcController();
+        BlockingRpcCallback<GetAbortedWriteTransactionsResponse> done =
+            new BlockingRpcCallback<GetAbortedWriteTransactionsResponse>();
+        GetAbortedWriteTransactionsRequest request = GetAbortedWriteTransactionsRequest.newBuilder()
+              .setTableName(table)
+              .setColumnFamily(columnFamily)
+              .build();
+        service.getAbortedWriteTransactions(controller, request, done);
+        return rpcConverter.convertFamilyRevisions(blockOnResponse(done, controller).getFamilyRevisionsList());
+      }
+    });
   }
 
   @Override
   public TableSnapshot createSnapshot(String tableName) throws IOException {
-    return rmProxy.createSnapshot(tableName);
+    return createSnapshot(tableName, null);
   }
 
   @Override
-  public TableSnapshot createSnapshot(String tableName, long revision) throws IOException {
-    return rmProxy.createSnapshot(tableName, revision);
+  public TableSnapshot createSnapshot(final String tableName, final Long revision) throws IOException {
+    return call(new Batch.Call<RevisionManagerEndpointService, TableSnapshot>() {
+      @Override
+      public TableSnapshot call(RevisionManagerEndpointService service)
+        throws IOException {
+        ServerRpcController controller = new ServerRpcController();
+        BlockingRpcCallback<CreateSnapshotResponse> done =
+            new BlockingRpcCallback<CreateSnapshotResponse>();
+        CreateSnapshotRequest.Builder builder = CreateSnapshotRequest.newBuilder()
+              .setTableName(tableName);
+        if(revision != null) {
+          builder.setRevision(revision);
+        }
+        service.createSnapshot(controller, builder.build(), done);
+        return rpcConverter.convertTableSnapshot(blockOnResponse(done, controller).getTableSnapshot());
+      }
+    });
   }
 
   @Override
-  public void keepAlive(Transaction transaction) throws IOException {
-    rmProxy.keepAlive(transaction);
+  public void keepAlive(final Transaction transaction) throws IOException {
+    call(new Batch.Call<RevisionManagerEndpointService, Void>() {
+      @Override
+      public Void call(RevisionManagerEndpointService service)
+        throws IOException {
+        ServerRpcController controller = new ServerRpcController();
+        BlockingRpcCallback<KeepAliveTransactionResponse> done =
+            new BlockingRpcCallback<KeepAliveTransactionResponse>();
+        KeepAliveTransactionRequest request = KeepAliveTransactionRequest.newBuilder()
+              .setTransaction(rpcConverter.convertTransaction(transaction)).build();
+        service.keepAliveTransaction(controller, request, done);
+        blockOnResponse(done, controller);
+        return null;
+      }
+    });
+  }
+  private <R> R blockOnResponse(BlockingRpcCallback<R> done, ServerRpcController controller)
+    throws IOException {
+    R response = done.get();
+    if(controller.failedOnException()) {
+      throw controller.getFailedOn();
+    }
+    if(controller.failed()) {
+      String error = controller.errorText();
+      if(error == null) {
+        error = "Server indicated failure but error text was empty";
+      }
+      throw new RuntimeException(error);
+    }
+    return response;
+  }
+  private <R> R call(Batch.Call<RevisionManagerEndpointService, R> callable) throws IOException {
+    try {
+      Map<byte[], R> result = htable.coprocessorService(RevisionManagerEndpointService.class, null, null, callable);
+      if(result.isEmpty()) {
+        return null;
+      }
+      return result.values().iterator().next();
+    } catch(IOException e) {
+      throw (IOException)e;
+    } catch(RuntimeException e) {
+      throw (RuntimeException)e;
+    } catch(Error e) {
+      throw (Error)e;
+    } catch(Throwable throwable) {
+      throw new RuntimeException(throwable);
+    }
   }
 
 }

Modified: hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java?rev=1541609&r1=1541608&r2=1541609&view=diff
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java (original)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java Wed Nov 13 16:47:11 2013
@@ -59,7 +59,14 @@ public class TableSnapshot implements Se
    * @return List<String> A list of column families associated with the snapshot.
    */
   public List<String> getColumnFamilies(){
-    return  new ArrayList<String>(this.cfRevisionMap.keySet());
+    return new ArrayList<String>(this.cfRevisionMap.keySet());
+  }
+
+  /**
+   * For wire serialization only
+   */
+  Map<String, Long> getColumnFamilyRevisionMap() {
+    return cfRevisionMap;
   }
 
   /**

Modified: hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/Transaction.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/Transaction.java?rev=1541609&r1=1541608&r2=1541609&view=diff
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/Transaction.java (original)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/Transaction.java Wed Nov 13 16:47:11 2013
@@ -64,6 +64,13 @@ public class Transaction implements Seri
   }
 
   /**
+   * For wire serialization only
+   */
+  long getTimeStamp() {
+    return timeStamp;
+  }
+
+  /**
    * @return The expire timestamp associated with a transaction.
    */
   long getTransactionExpireTimeStamp() {

Modified: hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java?rev=1541609&r1=1541608&r2=1541609&view=diff
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java (original)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java Wed Nov 13 16:47:11 2013
@@ -119,7 +119,7 @@ public class ZKBasedRevisionManager impl
    * @see org.apache.hcatalog.hbase.snapshot.RevisionManager#beginWriteTransaction(java.lang.String, java.util.List, long)
    */
   public Transaction beginWriteTransaction(String table,
-                       List<String> families, long keepAlive) throws IOException {
+                       List<String> families, Long keepAlive) throws IOException {
 
     checkInputParams(table, families);
     zkUtil.setUpZnodesForTable(table, families);
@@ -175,7 +175,7 @@ public class ZKBasedRevisionManager impl
    */
   public Transaction beginWriteTransaction(String table, List<String> families)
     throws IOException {
-    return beginWriteTransaction(table, families, -1);
+    return beginWriteTransaction(table, families, -1L);
   }
 
   /**
@@ -352,7 +352,7 @@ public class ZKBasedRevisionManager impl
   /* @throws IOException
    * @see org.apache.hcatalog.hbase.snapshot.RevsionManager#createSnapshot(java.lang.String, long)
    */
-  public TableSnapshot createSnapshot(String tableName, long revision) throws IOException {
+  public TableSnapshot createSnapshot(String tableName, Long revision) throws IOException {
 
     long currentID = zkUtil.currentID(tableName);
     if (revision > currentID) {

Added: hive/trunk/hcatalog/storage-handlers/hbase/src/protobuf/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpoint.proto
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/protobuf/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpoint.proto?rev=1541609&view=auto
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/protobuf/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpoint.proto (added)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/protobuf/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpoint.proto Wed Nov 13 16:47:11 2013
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+option java_package = "org.apache.hcatalog.hbase.snapshot";
+option java_outer_classname = "RevisionManagerEndpointProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+message CreateTableRequest {
+  required string table_name = 1;
+  repeated string column_families = 2;
+}
+message CreateTableResponse {}
+
+message DropTableRequest {
+  required string table_name = 1;
+}
+message DropTableResponse {}
+
+message BeginWriteTransactionRequest {
+  required string table_name = 1;
+  optional int64 keep_alive = 2;
+  repeated string column_families = 3;
+}
+message BeginWriteTransactionResponse {
+  required Transaction transaction = 1;
+}
+
+message CommitWriteTransactionRequest {
+  required Transaction transaction = 1;
+}
+message CommitWriteTransactionResponse {}
+
+message AbortWriteTransactionRequest {
+  required Transaction transaction = 1;
+}
+message AbortWriteTransactionResponse {}
+
+message GetAbortedWriteTransactionsRequest {
+  required string table_name = 1;
+  required string column_family = 2;
+}
+message GetAbortedWriteTransactionsResponse {
+  repeated FamilyRevision family_revisions = 1;
+}
+
+message CreateSnapshotRequest {
+ required string table_name = 1;
+ optional int64 revision = 2;
+}
+message CreateSnapshotResponse {
+ required TableSnapshot table_snapshot = 1;
+}
+
+message KeepAliveTransactionRequest {
+  required Transaction transaction = 1;
+}
+message KeepAliveTransactionResponse{}
+
+message FamilyRevision {
+  required int64 revision = 1;
+  required int64 timestamp = 2;
+}
+message Transaction {
+  required string table_name = 1;
+  required int64 time_stamp = 2;
+  required int64 keep_alive = 3;
+  required int64 revision = 4;
+  repeated string column_families = 5;
+}
+message TableSnapshot {
+  required string table_name = 1;
+  required int64 latest_revision = 2;
+  message ColumnFamilyRevision {
+    required string key = 1;
+    required int64 value = 2;
+  }
+  repeated ColumnFamilyRevision column_family_revision = 3;
+}
+
+service RevisionManagerEndpointService {
+  rpc createTable(CreateTableRequest) returns(CreateTableResponse);
+  rpc dropTable(DropTableRequest) returns(DropTableResponse);
+  rpc beginWriteTransaction(BeginWriteTransactionRequest) returns(BeginWriteTransactionResponse);
+  rpc commitWriteTransaction(CommitWriteTransactionRequest) returns(CommitWriteTransactionResponse);
+  rpc abortWriteTransaction(AbortWriteTransactionRequest) returns(AbortWriteTransactionResponse);
+  rpc getAbortedWriteTransactions(GetAbortedWriteTransactionsRequest) returns(GetAbortedWriteTransactionsResponse);
+  rpc createSnapshot(CreateSnapshotRequest) returns(CreateSnapshotResponse);
+  rpc keepAliveTransaction(KeepAliveTransactionRequest) returns (KeepAliveTransactionResponse);
+}

Modified: hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/ManyMiniCluster.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/ManyMiniCluster.java?rev=1541609&r1=1541608&r2=1541609&view=diff
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/ManyMiniCluster.java (original)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/ManyMiniCluster.java Wed Nov 13 16:47:11 2013
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.HTable;
@@ -121,7 +122,7 @@ public class ManyMiniCluster {
 
   protected synchronized void stop() {
     if (hbaseCluster != null) {
-      HConnectionManager.deleteAllConnections(true);
+      HConnectionManager.deleteAllConnections();
       try {
         hbaseCluster.shutdown();
       } catch (Exception e) {
@@ -266,7 +267,7 @@ public class ManyMiniCluster {
       hbaseCluster = new MiniHBaseCluster(hbaseConf, numRegionServers);
       hbaseConf.set("hbase.master", hbaseCluster.getMaster().getServerName().getHostAndPort());
       //opening the META table ensures that cluster is running
-      new HTable(hbaseConf, HConstants.META_TABLE_NAME);
+      new HTable(hbaseConf, TableName.META_TABLE_NAME.getNameAsString());
     } catch (Exception e) {
       throw new IllegalStateException("Failed to setup HBase Cluster", e);
     }

Modified: hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java?rev=1541609&r1=1541608&r2=1541609&view=diff
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java (original)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java Wed Nov 13 16:47:11 2013
@@ -19,6 +19,15 @@
 
 package org.apache.hcatalog.hbase;
 
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -71,15 +80,6 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
 /**
  * Tests components of HBaseHCatStorageHandler using ManyMiniCluster.
  * Including ImprtSequenceFile and HBaseBulkOutputFormat
@@ -258,6 +258,7 @@ public class TestHBaseBulkOutputFormat e
       }
       index++;
     }
+    table.close();
     //test if load count is the same
     assertEquals(data.length, index);
     //test if scratch directory was erased
@@ -297,6 +298,7 @@ public class TestHBaseBulkOutputFormat e
 
 
     //create job
+    HBaseHCatStorageHandler.setHBaseSerializers(conf);
     Job job = new Job(conf, testName);
     job.setWorkingDirectory(new Path(methodTestDir, "mr_work"));
     job.setJarByClass(this.getClass());
@@ -335,6 +337,7 @@ public class TestHBaseBulkOutputFormat e
       }
       index++;
     }
+    table.close();
     //test if load count is the same
     assertEquals(data.length, index);
     //test if scratch directory was erased
@@ -432,6 +435,7 @@ public class TestHBaseBulkOutputFormat e
       }
       index++;
     }
+    table.close();
     //test if load count is the same
     assertEquals(data.length, index);
   }
@@ -514,6 +518,7 @@ public class TestHBaseBulkOutputFormat e
       }
       index++;
     }
+    table.close();
     //test if load count is the same
     assertEquals(data.length, index);
   }
@@ -608,6 +613,7 @@ public class TestHBaseBulkOutputFormat e
     job.setOutputKeyClass(BytesWritable.class);
     job.setOutputValueClass(Text.class);
     job.setNumReduceTasks(0);
+    table.close();
     assertTrue(job.waitForCompletion(true));
   }
 

Modified: hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java?rev=1541609&r1=1541608&r2=1541609&view=diff
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java (original)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java Wed Nov 13 16:47:11 2013
@@ -189,6 +189,7 @@ public class TestHBaseDirectOutputFormat
       }
       index++;
     }
+    table.close();
     assertEquals(data.length, index);
   }
 
@@ -356,6 +357,7 @@ public class TestHBaseDirectOutputFormat
       }
       count++;
     }
+    table.close();
     assertEquals(data.length - 1, count);
 
     // verify that the inputformat returns empty results.

Modified: hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHCatHBaseInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHCatHBaseInputFormat.java?rev=1541609&r1=1541608&r2=1541609&view=diff
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHCatHBaseInputFormat.java (original)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHCatHBaseInputFormat.java Wed Nov 13 16:47:11 2013
@@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.util.Byte
 import org.apache.hadoop.hive.cli.CliSessionState;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.hbase.ResultWritable;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
@@ -546,7 +547,7 @@ public class TestHCatHBaseInputFormat ex
   }
 
   static class MapReadProjectionHTable
-    implements org.apache.hadoop.mapred.Mapper<ImmutableBytesWritable, Result, WritableComparable<?>, Text> {
+    implements org.apache.hadoop.mapred.Mapper<ImmutableBytesWritable, Object, WritableComparable<?>, Text> {
 
     static boolean error = false;
     static int count = 0;
@@ -560,9 +561,17 @@ public class TestHCatHBaseInputFormat ex
     }
 
     @Override
-    public void map(ImmutableBytesWritable key, Result result,
+    public void map(ImmutableBytesWritable key, Object resultObj,
         OutputCollector<WritableComparable<?>, Text> output, Reporter reporter)
       throws IOException {
+      Result result;
+      if (resultObj instanceof Result){
+        result = (Result) resultObj;
+      } else if (resultObj instanceof ResultWritable) {
+        result = ((ResultWritable)resultObj).getResult();
+      } else {
+        throw new IllegalArgumentException("Illegal Argument " + (resultObj == null ? "null" : resultObj.getClass().getName()));
+      }
       System.out.println("Result " + result.toString());
       List<KeyValue> list = result.list();
       boolean correctValues = (list.size() == 1)

Modified: hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManager.java?rev=1541609&r1=1541608&r2=1541609&view=diff
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManager.java (original)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManager.java Wed Nov 13 16:47:11 2013
@@ -210,7 +210,7 @@ public class TestRevisionManager extends
     String tableName = newTableName("testTable");
     List<String> columnFamilies = Arrays.asList("cf1", "cf2");
     Transaction txn = manager.beginWriteTransaction(tableName,
-      columnFamilies, 40);
+      columnFamilies, 40L);
     Thread.sleep(100);
     try {
       manager.commitWriteTransaction(txn);

Modified: hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManagerEndpoint.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManagerEndpoint.java?rev=1541609&r1=1541608&r2=1541609&view=diff
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManagerEndpoint.java (original)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManagerEndpoint.java Wed Nov 13 16:47:11 2013
@@ -20,6 +20,7 @@ package org.apache.hcatalog.hbase.snapsh
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -135,13 +136,13 @@ public class TestRevisionManagerEndpoint
     @Override
     public Transaction beginWriteTransaction(String table,
                          List<String> families) throws IOException {
-      return recordCall(null, table, families);
+      return recordCall(new Transaction(table, families, 0L, 0L), table, families);
     }
 
     @Override
     public Transaction beginWriteTransaction(String table,
-                         List<String> families, long keepAlive) throws IOException {
-      return recordCall(null, table, families, keepAlive);
+                         List<String> families, Long keepAlive) throws IOException {
+      return recordCall(new Transaction(table, families, 0L, 0L), table, families, keepAlive);
     }
 
     @Override
@@ -157,17 +158,17 @@ public class TestRevisionManagerEndpoint
     @Override
     public List<FamilyRevision> getAbortedWriteTransactions(String table,
                                 String columnFamily) throws IOException {
-      return null;
+      return Collections.singletonList(new FamilyRevision(0L, 0L));
     }
 
     @Override
     public TableSnapshot createSnapshot(String tableName)
       throws IOException {
-      return null;
+      return createSnapshot(tableName, 0L);
     }
 
     @Override
-    public TableSnapshot createSnapshot(String tableName, long revision)
+    public TableSnapshot createSnapshot(String tableName, Long revision)
       throws IOException {
       TableSnapshot ret = new TableSnapshot(tableName, new HashMap<String, Long>(), revision);
       return recordCall(ret, tableName, revision);
@@ -201,7 +202,7 @@ public class TestRevisionManagerEndpoint
     Assert.assertEquals(call.methodName, call, mockImpl.lastCall);
 
     call = new MockRM.Invocation("createSnapshot", null, "t3", 1L);
-    call.ret = rm.createSnapshot("t3", 1);
+    call.ret = rm.createSnapshot("t3", 1L);
     Assert.assertEquals(call.methodName, call, mockImpl.lastCall);
 
   }

Modified: hive/trunk/itests/hcatalog-unit/pom.xml
URL: http://svn.apache.org/viewvc/hive/trunk/itests/hcatalog-unit/pom.xml?rev=1541609&r1=1541608&r2=1541609&view=diff
==============================================================================
--- hive/trunk/itests/hcatalog-unit/pom.xml (original)
+++ hive/trunk/itests/hcatalog-unit/pom.xml Wed Nov 13 16:47:11 2013
@@ -97,18 +97,6 @@
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>org.apache.hbase</groupId>
-      <artifactId>hbase</artifactId>
-      <version>${hbase.version}</version>
-      <scope>test</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.thrift</groupId>
-          <artifactId>libthrift</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <version>${junit.version}</version>
@@ -143,6 +131,64 @@
           <scope>test</scope>
         </dependency>
         <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-client</artifactId>
+          <version>${hbase.hadoop1.version}</version>
+          <scope>test</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-common</artifactId>
+          <version>${hbase.hadoop1.version}</version>
+          <scope>test</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-common</artifactId>
+          <version>${hbase.hadoop1.version}</version>
+          <scope>test</scope>
+          <classifier>tests</classifier>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-hadoop-compat</artifactId>
+          <version>${hbase.hadoop1.version}</version>
+          <scope>test</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-hadoop-compat</artifactId>
+          <version>${hbase.hadoop1.version}</version>
+          <scope>test</scope>
+          <classifier>tests</classifier>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-hadoop1-compat</artifactId>
+          <version>${hbase.hadoop1.version}</version>
+          <scope>test</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-hadoop1-compat</artifactId>
+          <version>${hbase.hadoop1.version}</version>
+          <scope>test</scope>
+          <classifier>tests</classifier>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-server</artifactId>
+          <version>${hbase.hadoop1.version}</version>
+          <scope>test</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-server</artifactId>
+          <version>${hbase.hadoop1.version}</version>
+          <classifier>tests</classifier>
+          <scope>test</scope>
+        </dependency>
+        <dependency>
           <groupId>org.apache.pig</groupId>
           <artifactId>pig</artifactId>
           <version>${pig.version}</version>
@@ -199,6 +245,64 @@
           <scope>test</scope>
         </dependency>
         <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-client</artifactId>
+          <version>${hbase.hadoop2.version}</version>
+          <scope>test</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-common</artifactId>
+          <version>${hbase.hadoop2.version}</version>
+          <scope>test</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-common</artifactId>
+          <version>${hbase.hadoop2.version}</version>
+          <scope>test</scope>
+          <classifier>tests</classifier>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-hadoop-compat</artifactId>
+          <version>${hbase.hadoop2.version}</version>
+          <scope>test</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-hadoop-compat</artifactId>
+          <version>${hbase.hadoop2.version}</version>
+          <scope>test</scope>
+          <classifier>tests</classifier>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-hadoop2-compat</artifactId>
+          <version>${hbase.hadoop2.version}</version>
+          <scope>test</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-hadoop2-compat</artifactId>
+          <version>${hbase.hadoop2.version}</version>
+          <scope>test</scope>
+          <classifier>tests</classifier>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-server</artifactId>
+          <version>${hbase.hadoop2.version}</version>
+          <scope>test</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-server</artifactId>
+          <version>${hbase.hadoop2.version}</version>
+          <classifier>tests</classifier>
+          <scope>test</scope>
+        </dependency>
+        <dependency>
           <groupId>org.apache.pig</groupId>
           <artifactId>pig</artifactId>
           <version>${pig.version}</version>

Modified: hive/trunk/itests/qtest/pom.xml
URL: http://svn.apache.org/viewvc/hive/trunk/itests/qtest/pom.xml?rev=1541609&r1=1541608&r2=1541609&view=diff
==============================================================================
--- hive/trunk/itests/qtest/pom.xml (original)
+++ hive/trunk/itests/qtest/pom.xml Wed Nov 13 16:47:11 2013
@@ -105,19 +105,6 @@
       <version>${junit.version}</version>
       <scope>test</scope>
     </dependency>
-    <dependency>
-      <groupId>org.apache.hbase</groupId>
-      <artifactId>hbase</artifactId>
-      <version>${hbase.version}</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hbase</groupId>
-      <artifactId>hbase</artifactId>
-      <version>${hbase.version}</version>
-      <scope>test</scope>
-      <classifier>tests</classifier>
-    </dependency>
   </dependencies>
 
   <profiles>
@@ -156,6 +143,58 @@
           <version>${hadoop-20S.version}</version>
           <scope>test</scope>
         </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-common</artifactId>
+          <version>${hbase.hadoop1.version}</version>
+          <scope>test</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-common</artifactId>
+          <version>${hbase.hadoop1.version}</version>
+          <scope>test</scope>
+          <classifier>tests</classifier>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-hadoop-compat</artifactId>
+          <version>${hbase.hadoop1.version}</version>
+          <scope>test</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-hadoop-compat</artifactId>
+          <version>${hbase.hadoop1.version}</version>
+          <scope>test</scope>
+          <classifier>tests</classifier>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-hadoop1-compat</artifactId>
+          <version>${hbase.hadoop1.version}</version>
+          <scope>test</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-hadoop1-compat</artifactId>
+          <version>${hbase.hadoop1.version}</version>
+          <scope>test</scope>
+          <classifier>tests</classifier>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-server</artifactId>
+          <version>${hbase.hadoop1.version}</version>
+          <scope>test</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-server</artifactId>
+          <version>${hbase.hadoop1.version}</version>
+          <classifier>tests</classifier>
+          <scope>test</scope>
+        </dependency>
       </dependencies>
     </profile>
    <profile>
@@ -191,6 +230,58 @@
           <version>${hadoop-23.version}</version>
           <scope>test</scope>
         </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-common</artifactId>
+          <version>${hbase.hadoop2.version}</version>
+          <scope>test</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-common</artifactId>
+          <version>${hbase.hadoop2.version}</version>
+          <scope>test</scope>
+          <classifier>tests</classifier>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-hadoop-compat</artifactId>
+          <version>${hbase.hadoop2.version}</version>
+          <scope>test</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-hadoop-compat</artifactId>
+          <version>${hbase.hadoop2.version}</version>
+          <scope>test</scope>
+          <classifier>tests</classifier>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-hadoop2-compat</artifactId>
+          <version>${hbase.hadoop2.version}</version>
+          <scope>test</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-hadoop2-compat</artifactId>
+          <version>${hbase.hadoop2.version}</version>
+          <scope>test</scope>
+          <classifier>tests</classifier>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-server</artifactId>
+          <version>${hbase.hadoop2.version}</version>
+          <scope>test</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-server</artifactId>
+          <version>${hbase.hadoop2.version}</version>
+          <classifier>tests</classifier>
+          <scope>test</scope>
+        </dependency>
       </dependencies>
     </profile>
   </profiles>