You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by to...@apache.org on 2012/03/03 02:56:06 UTC

svn commit: r1296568 [1/3] - in /incubator/hcatalog/trunk: ./ src/java/org/apache/hcatalog/mapreduce/ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ storage-drivers/hbase/src...

Author: toffer
Date: Sat Mar  3 02:56:05 2012
New Revision: 1296568

URL: http://svn.apache.org/viewvc?rev=1296568&view=rev
Log:
HCAT-252 Rework HBase storage driver into HBase storage handler (rohini via toffer)

Added:
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRevisionManagerUtil.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java
Removed:
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputStorageDriver.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseOutputStorageDriver.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java.broken
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java.broken
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java.broken
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseSerDeResultConverter.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java.broken
Modified:
    incubator/hcatalog/trunk/CHANGES.txt
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultRecordWriterContainer.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/FamilyRevision.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java

Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1296568&r1=1296567&r2=1296568&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Sat Mar  3 02:56:05 2012
@@ -33,6 +33,8 @@ Trunk (unreleased changes)
 Release 0.4.0 - Unreleased
 
   INCOMPATIBLE CHANGES
+  HCAT-252 Rework HBase storage driver into HBase storage handler (rohini via toffer) 
+
   HCAT-265 remove deprecated HCatStorageHandler (toffer)
 
   HCAT-239. Changes to HCatInputFormat to make it use SerDes instead of StorageDrivers (vikram.dixit via gates)

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java?rev=1296568&r1=1296567&r2=1296568&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java Sat Mar  3 02:56:05 2012
@@ -20,11 +20,12 @@ package org.apache.hcatalog.mapreduce;
 
 import java.io.IOException;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.mapred.HCatMapRedUtil;
 import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.JobStatus.State;
 import org.apache.hcatalog.common.HCatConstants;
@@ -36,6 +37,8 @@ import org.apache.hcatalog.common.HCatUt
  */
 class DefaultOutputCommitterContainer extends OutputCommitterContainer {
 
+    private static final Log LOG = LogFactory.getLog(DefaultOutputCommitterContainer.class);
+
     /**
      * @param context current JobContext
      * @param baseCommitter OutputCommitter to contain
@@ -86,11 +89,9 @@ class DefaultOutputCommitterContainer ex
     public void cleanupJob(JobContext context) throws IOException {
         getBaseOutputCommitter().cleanupJob(HCatMapRedUtil.createJobContext(context));
 
-        OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context);
-
         //Cancel HCat and JobTracker tokens
         try {
-            HiveConf hiveConf = HCatUtil.getHiveConf(null, 
+            HiveConf hiveConf = HCatUtil.getHiveConf(null,
                                                   context.getConfiguration());
             HiveMetaStoreClient client = HCatUtil.createHiveClient(hiveConf);
             String tokenStrForm = client.getTokenStrForm();
@@ -98,7 +99,7 @@ class DefaultOutputCommitterContainer ex
               client.cancelDelegationToken(tokenStrForm);
             }
         } catch (Exception e) {
-            throw new IOException("Failed to cancel delegation token",e);
+            LOG.warn("Failed to cancel delegation token", e);
         }
     }
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java?rev=1296568&r1=1296567&r2=1296568&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java Sat Mar  3 02:56:05 2012
@@ -30,6 +30,7 @@ import org.apache.hcatalog.common.HCatUt
 import org.apache.hcatalog.data.HCatRecord;
 
 import java.io.IOException;
+import java.text.NumberFormat;
 
 /**
  * Bare bones implementation of OutputFormatContainer. Does only the required
@@ -38,10 +39,20 @@ import java.io.IOException;
  */
 class DefaultOutputFormatContainer extends OutputFormatContainer {
 
+    private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
+
+    static {
+      NUMBER_FORMAT.setMinimumIntegerDigits(5);
+      NUMBER_FORMAT.setGroupingUsed(false);
+    }
+
     public DefaultOutputFormatContainer(org.apache.hadoop.mapred.OutputFormat<WritableComparable<?>, Writable> of) {
         super(of);
     }
 
+    static synchronized String getOutputName(int partition) {
+        return "part-" + NUMBER_FORMAT.format(partition);
+      }
 
     /**
      * Get the record writer for the job. Uses the Table's default OutputStorageDriver
@@ -53,8 +64,9 @@ class DefaultOutputFormatContainer exten
     @Override
     public RecordWriter<WritableComparable<?>, HCatRecord>
     getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
+        String name = getOutputName(context.getTaskAttemptID().getTaskID().getId());
         return new DefaultRecordWriterContainer(context,
-                getBaseOutputFormat().getRecordWriter(null, new JobConf(context.getConfiguration()),null, InternalUtil.createReporter(context)));
+                getBaseOutputFormat().getRecordWriter(null, new JobConf(context.getConfiguration()), name, InternalUtil.createReporter(context)));
     }
 
 

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultRecordWriterContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultRecordWriterContainer.java?rev=1296568&r1=1296567&r2=1296568&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultRecordWriterContainer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultRecordWriterContainer.java Sat Mar  3 02:56:05 2012
@@ -72,7 +72,7 @@ class DefaultRecordWriterContainer exten
     public void write(WritableComparable<?> key, HCatRecord value) throws IOException,
             InterruptedException {
         try {
-            getBaseRecordWriter().write(null, serDe.serialize(value, hcatRecordOI));
+            getBaseRecordWriter().write(null, serDe.serialize(value.getAll(), hcatRecordOI));
         } catch (SerDeException e) {
             throw new IOException("Failed to serialize object",e);
         }

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java?rev=1296568&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java Sat Mar  3 02:56:05 2012
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.hbase;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+
+public class HBaseBaseOutputFormat implements OutputFormat<WritableComparable<?>, Put>,
+        HiveOutputFormat<WritableComparable<?>, Put> {
+
+    @Override
+    public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter(
+            JobConf jc, Path finalOutPath,
+            Class<? extends Writable> valueClass, boolean isCompressed,
+            Properties tableProperties, Progressable progress)
+            throws IOException {
+        throw new UnsupportedOperationException("Not implemented");
+    }
+
+    @Override
+    public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
+        OutputFormat<WritableComparable<?>, Put> outputFormat = getOutputFormat(job);
+        outputFormat.checkOutputSpecs(ignored, job);
+    }
+
+    @Override
+    public RecordWriter<WritableComparable<?>, Put> getRecordWriter(FileSystem ignored,
+            JobConf job, String name, Progressable progress) throws IOException {
+        OutputFormat<WritableComparable<?>, Put> outputFormat = getOutputFormat(job);
+        return outputFormat.getRecordWriter(ignored, job, name, progress);
+    }
+
+    private OutputFormat<WritableComparable<?>, Put> getOutputFormat(JobConf job)
+            throws IOException {
+        String outputInfo = job.get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
+        OutputJobInfo outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(outputInfo);
+        OutputFormat<WritableComparable<?>, Put> outputFormat = null;
+        if (HBaseHCatStorageHandler.isBulkMode(outputJobInfo)) {
+            outputFormat = new HBaseBulkOutputFormat();
+        } else {
+            outputFormat = new HBaseDirectOutputFormat();
+        }
+        return outputFormat;
+    }
+}

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java?rev=1296568&r1=1296567&r2=1296568&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java Sat Mar  3 02:56:05 2012
@@ -1,160 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hcatalog.hbase;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import java.io.IOException;
+import java.util.List;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileOutputCommitter;
+import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.util.Progressable;
 import org.apache.hcatalog.hbase.snapshot.RevisionManager;
-import org.apache.hcatalog.hbase.snapshot.RevisionManagerFactory;
-import org.apache.hcatalog.hbase.snapshot.Transaction;
-import org.apache.hcatalog.mapreduce.HCatOutputFormat;
-import org.apache.hcatalog.mapreduce.OutputJobInfo;
-
-import java.io.IOException;
 
 /**
- * Class which imports data into HBase via it's "bulk load" feature. Wherein regions
- * are created by the MR job using HFileOutputFormat and then later "moved" into
- * the appropriate region server.
+ * Class which imports data into HBase via it's "bulk load" feature. Wherein
+ * regions are created by the MR job using HFileOutputFormat and then later
+ * "moved" into the appropriate region server.
  */
-class HBaseBulkOutputFormat extends OutputFormat<WritableComparable<?>,Put> {
-    private final static ImmutableBytesWritable EMPTY_LIST = new ImmutableBytesWritable(new byte[0]);
-    private SequenceFileOutputFormat<WritableComparable<?>,Put> baseOutputFormat;
-    private final static Log LOG = LogFactory.getLog(HBaseBulkOutputFormat.class);
+class HBaseBulkOutputFormat extends HBaseBaseOutputFormat {
+
+    private final static ImmutableBytesWritable EMPTY_LIST = new ImmutableBytesWritable(
+            new byte[0]);
+    private SequenceFileOutputFormat<WritableComparable<?>, Put> baseOutputFormat;
 
     public HBaseBulkOutputFormat() {
-        baseOutputFormat = new SequenceFileOutputFormat<WritableComparable<?>,Put>();
+        baseOutputFormat = new SequenceFileOutputFormat<WritableComparable<?>, Put>();
     }
 
     @Override
-    public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
-        baseOutputFormat.checkOutputSpecs(context);
-        //Get jobTracker delegation token if security is enabled
-        //we need to launch the ImportSequenceFile job
-        if(context.getConfiguration().getBoolean("hadoop.security.authorization",false)) {
-            JobClient jobClient = new JobClient(new JobConf(context.getConfiguration()));
-            context.getCredentials().addToken(new Text("my mr token"), jobClient.getDelegationToken(null));
-        }
+    public void checkOutputSpecs(FileSystem ignored, JobConf job)
+            throws IOException {
+        job.setOutputKeyClass(ImmutableBytesWritable.class);
+        job.setOutputValueClass(Put.class);
+        job.setOutputCommitter(HBaseBulkOutputCommitter.class);
+        baseOutputFormat.checkOutputSpecs(ignored, job);
+        getJTDelegationToken(job);
     }
 
     @Override
-    public RecordWriter<WritableComparable<?>, Put> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
-        //TODO use a constant/static setter when available
-        context.getConfiguration().setClass("mapred.output.key.class",ImmutableBytesWritable.class,Object.class);
-        context.getConfiguration().setClass("mapred.output.value.class",Put.class,Object.class);
-        return new HBaseBulkRecordWriter(baseOutputFormat.getRecordWriter(context));
+    public RecordWriter<WritableComparable<?>, Put> getRecordWriter(
+            FileSystem ignored, JobConf job, String name, Progressable progress)
+            throws IOException {
+        long version = HBaseRevisionManagerUtil.getOutputRevision(job);
+        return new HBaseBulkRecordWriter(baseOutputFormat.getRecordWriter(
+                ignored, job, name, progress), version);
     }
 
-    @Override
-    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException {
-        return new HBaseBulkOutputCommitter(baseOutputFormat.getOutputCommitter(context));
+    private void getJTDelegationToken(JobConf job) throws IOException {
+        // Get jobTracker delegation token if security is enabled
+        // we need to launch the ImportSequenceFile job
+        if (job.getBoolean("hadoop.security.authorization", false)) {
+            JobClient jobClient = new JobClient(new JobConf(job));
+            try {
+                job.getCredentials().addToken(new Text("my mr token"),
+                        jobClient.getDelegationToken(null));
+            } catch (InterruptedException e) {
+                throw new IOException("Error while getting JT delegation token", e);
+            }
+        }
     }
 
-    private static class HBaseBulkRecordWriter extends  RecordWriter<WritableComparable<?>,Put> {
-        private RecordWriter<WritableComparable<?>,Put> baseWriter;
+    private static class HBaseBulkRecordWriter implements
+            RecordWriter<WritableComparable<?>, Put> {
 
-        public HBaseBulkRecordWriter(RecordWriter<WritableComparable<?>,Put> baseWriter)  {
+        private RecordWriter<WritableComparable<?>, Put> baseWriter;
+        private final Long outputVersion;
+
+        public HBaseBulkRecordWriter(
+                RecordWriter<WritableComparable<?>, Put> baseWriter,
+                Long outputVersion) {
             this.baseWriter = baseWriter;
+            this.outputVersion = outputVersion;
         }
 
         @Override
-        public void write(WritableComparable<?> key, Put value) throws IOException, InterruptedException {
-            //we ignore the key
-            baseWriter.write(EMPTY_LIST, value);
+        public void write(WritableComparable<?> key, Put value)
+                throws IOException {
+            Put put = value;
+            if (outputVersion != null) {
+                put = new Put(value.getRow(), outputVersion.longValue());
+                for (List<KeyValue> row : value.getFamilyMap().values()) {
+                    for (KeyValue el : row) {
+                        put.add(el.getFamily(), el.getQualifier(), el.getValue());
+                    }
+                }
+            }
+            // we ignore the key
+            baseWriter.write(EMPTY_LIST, put);
         }
 
         @Override
-        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
-            baseWriter.close(context);
+        public void close(Reporter reporter) throws IOException {
+            baseWriter.close(reporter);
         }
     }
 
-    private static class HBaseBulkOutputCommitter extends OutputCommitter {
-        private OutputCommitter baseOutputCommitter;
+    public static class HBaseBulkOutputCommitter extends OutputCommitter {
+
+        private final OutputCommitter baseOutputCommitter;
 
-        public HBaseBulkOutputCommitter(OutputCommitter baseOutputCommitter) throws IOException {
-            this.baseOutputCommitter = baseOutputCommitter;
+        public HBaseBulkOutputCommitter() {
+            baseOutputCommitter = new FileOutputCommitter();
         }
 
         @Override
-        public void abortTask(TaskAttemptContext context) throws IOException {
-            baseOutputCommitter.abortTask(context);
+        public void abortTask(TaskAttemptContext taskContext)
+                throws IOException {
+            baseOutputCommitter.abortTask(taskContext);
         }
 
         @Override
-        public void commitTask(TaskAttemptContext context) throws IOException {
-            baseOutputCommitter.commitTask(context);
+        public void commitTask(TaskAttemptContext taskContext)
+                throws IOException {
+            baseOutputCommitter.commitTask(taskContext);
         }
 
         @Override
-        public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
-            return baseOutputCommitter.needsTaskCommit(context);
+        public boolean needsTaskCommit(TaskAttemptContext taskContext)
+                throws IOException {
+            return baseOutputCommitter.needsTaskCommit(taskContext);
         }
 
         @Override
-        public void setupJob(JobContext context) throws IOException {
-            baseOutputCommitter.setupJob(context);
+        public void setupJob(JobContext jobContext) throws IOException {
+            baseOutputCommitter.setupJob(jobContext);
         }
 
         @Override
-        public void setupTask(TaskAttemptContext context) throws IOException {
-            baseOutputCommitter.setupTask(context);
+        public void setupTask(TaskAttemptContext taskContext)
+                throws IOException {
+            baseOutputCommitter.setupTask(taskContext);
         }
 
         @Override
-        public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
+        public void abortJob(JobContext jobContext, int status)
+                throws IOException {
+            baseOutputCommitter.abortJob(jobContext, status);
             RevisionManager rm = null;
             try {
-                baseOutputCommitter.abortJob(jobContext,state);
-                rm = HBaseHCatStorageHandler.getOpenedRevisionManager(jobContext.getConfiguration());
-                rm.abortWriteTransaction(HBaseHCatStorageHandler.getWriteTransaction(jobContext.getConfiguration()));
+                rm = HBaseRevisionManagerUtil
+                        .getOpenedRevisionManager(jobContext.getConfiguration());
+                rm.abortWriteTransaction(HBaseRevisionManagerUtil
+                        .getWriteTransaction(jobContext.getConfiguration()));
             } finally {
                 cleanIntermediate(jobContext);
-                if(rm != null)
+                if (rm != null)
                     rm.close();
             }
         }
 
         @Override
         public void commitJob(JobContext jobContext) throws IOException {
+            baseOutputCommitter.commitJob(jobContext);
             RevisionManager rm = null;
             try {
-                baseOutputCommitter.commitJob(jobContext);
                 Configuration conf = jobContext.getConfiguration();
-                Path srcPath = FileOutputFormat.getOutputPath(jobContext);
-                Path destPath = new Path(srcPath.getParent(),srcPath.getName()+"_hfiles");
+                Path srcPath = FileOutputFormat.getOutputPath(jobContext.getJobConf());
+                Path destPath = new Path(srcPath.getParent(), srcPath.getName() + "_hfiles");
                 ImportSequenceFile.runJob(jobContext,
-                                          conf.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY),
-                                          srcPath,
-                                          destPath);
-                rm = HBaseHCatStorageHandler.getOpenedRevisionManager(jobContext.getConfiguration());
-                rm.commitWriteTransaction(HBaseHCatStorageHandler.getWriteTransaction(jobContext.getConfiguration()));
+                                conf.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY),
+                                srcPath,
+                                destPath);
+                rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf);
+                rm.commitWriteTransaction(HBaseRevisionManagerUtil.getWriteTransaction(conf));
                 cleanIntermediate(jobContext);
             } finally {
-                if(rm != null)
+                if (rm != null)
                     rm.close();
             }
         }
 
-        public void cleanIntermediate(JobContext jobContext) throws IOException {
+        private void cleanIntermediate(JobContext jobContext)
+                throws IOException {
             FileSystem fs = FileSystem.get(jobContext.getConfiguration());
-            fs.delete(FileOutputFormat.getOutputPath(jobContext),true);
+            fs.delete(FileOutputFormat.getOutputPath(jobContext.getJobConf()), true);
         }
     }
 }

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java?rev=1296568&r1=1296567&r2=1296568&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java Sat Mar  3 02:56:05 2012
@@ -18,104 +18,135 @@
 
 package org.apache.hcatalog.hbase;
 
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
-import org.apache.hadoop.io.Writable;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.mapred.TableOutputFormat;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.util.Progressable;
 import org.apache.hcatalog.hbase.snapshot.RevisionManager;
-
-
-import java.io.IOException;
+import org.apache.hcatalog.hbase.snapshot.Transaction;
 
 /**
- * "Direct" implementation of OutputFormat for HBase. Uses HTable client's put API to write each row to HBase one a
- * time. Presently it is just using TableOutputFormat as the underlying implementation in the future we can
- * tune this to make the writes faster such as permanently disabling WAL, caching, etc.
+ * "Direct" implementation of OutputFormat for HBase. Uses HTable client's put
+ * API to write each row to HBase one a time. Presently it is just using
+ * TableOutputFormat as the underlying implementation in the future we can tune
+ * this to make the writes faster such as permanently disabling WAL, caching,
+ * etc.
  */
-class HBaseDirectOutputFormat extends OutputFormat<WritableComparable<?>,Writable> implements Configurable {
+class HBaseDirectOutputFormat extends HBaseBaseOutputFormat {
 
-    private TableOutputFormat<WritableComparable<?>> outputFormat;
+    private TableOutputFormat outputFormat;
 
     public HBaseDirectOutputFormat() {
-        this.outputFormat = new TableOutputFormat<WritableComparable<?>>();
+        this.outputFormat = new TableOutputFormat();
     }
 
     @Override
-    public RecordWriter<WritableComparable<?>, Writable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
-        return outputFormat.getRecordWriter(context);
+    public RecordWriter<WritableComparable<?>, Put> getRecordWriter(FileSystem ignored,
+            JobConf job, String name, Progressable progress)
+            throws IOException {
+        long version = HBaseRevisionManagerUtil.getOutputRevision(job);
+        return new HBaseDirectRecordWriter(outputFormat.getRecordWriter(ignored, job, name,
+                progress), version);
     }
 
     @Override
-    public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
-        outputFormat.checkOutputSpecs(context);
-    }
+    public void checkOutputSpecs(FileSystem ignored, JobConf job)
+            throws IOException {
+        job.setOutputCommitter(HBaseDirectOutputCommitter.class);
+        job.setIfUnset(TableOutputFormat.OUTPUT_TABLE,
+                job.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY));
+        outputFormat.checkOutputSpecs(ignored, job);
+    }
+
+    private static class HBaseDirectRecordWriter implements
+            RecordWriter<WritableComparable<?>, Put> {
+
+        private RecordWriter<WritableComparable<?>, Put> baseWriter;
+        private final Long outputVersion;
+
+        public HBaseDirectRecordWriter(
+                RecordWriter<WritableComparable<?>, Put> baseWriter,
+                Long outputVersion) {
+            this.baseWriter = baseWriter;
+            this.outputVersion = outputVersion;
+        }
 
-    @Override
-    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
-        return new HBaseDirectOutputCommitter(outputFormat.getOutputCommitter(context));
-    }
+        @Override
+        public void write(WritableComparable<?> key, Put value)
+                throws IOException {
+            Put put = value;
+            if (outputVersion != null) {
+                put = new Put(value.getRow(), outputVersion.longValue());
+                for (List<KeyValue> row : value.getFamilyMap().values()) {
+                    for (KeyValue el : row) {
+                        put.add(el.getFamily(), el.getQualifier(), el.getValue());
+                    }
+                }
+            }
+            baseWriter.write(key, put);
+        }
 
-    @Override
-    public void setConf(Configuration conf) {
-        String tableName = conf.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY);
-        conf = new Configuration(conf);
-        conf.set(TableOutputFormat.OUTPUT_TABLE,tableName);
-        outputFormat.setConf(conf);
-    }
+        @Override
+        public void close(Reporter reporter) throws IOException {
+            baseWriter.close(reporter);
+        }
 
-    @Override
-    public Configuration getConf() {
-        return outputFormat.getConf();
     }
 
-    private static class HBaseDirectOutputCommitter extends OutputCommitter {
-        private OutputCommitter baseOutputCommitter;
+    public static class HBaseDirectOutputCommitter extends OutputCommitter {
 
-        public HBaseDirectOutputCommitter(OutputCommitter baseOutputCommitter) throws IOException {
-            this.baseOutputCommitter = baseOutputCommitter;
+        public HBaseDirectOutputCommitter() throws IOException {
         }
 
         @Override
-        public void abortTask(TaskAttemptContext context) throws IOException {
-            baseOutputCommitter.abortTask(context);
+        public void abortTask(TaskAttemptContext taskContext)
+                throws IOException {
         }
 
         @Override
-        public void commitTask(TaskAttemptContext context) throws IOException {
-            baseOutputCommitter.commitTask(context);
+        public void commitTask(TaskAttemptContext taskContext)
+                throws IOException {
         }
 
         @Override
-        public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
-            return baseOutputCommitter.needsTaskCommit(context);
+        public boolean needsTaskCommit(TaskAttemptContext taskContext)
+                throws IOException {
+            return false;
         }
 
         @Override
-        public void setupJob(JobContext context) throws IOException {
-            baseOutputCommitter.setupJob(context);
+        public void setupJob(JobContext jobContext) throws IOException {
         }
 
         @Override
-        public void setupTask(TaskAttemptContext context) throws IOException {
-            baseOutputCommitter.setupTask(context);
+        public void setupTask(TaskAttemptContext taskContext)
+                throws IOException {
         }
 
         @Override
-        public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
+        public void abortJob(JobContext jobContext, int status)
+                throws IOException {
+            super.abortJob(jobContext, status);
             RevisionManager rm = null;
             try {
-                baseOutputCommitter.abortJob(jobContext, state);
-                rm = HBaseHCatStorageHandler.getOpenedRevisionManager(jobContext.getConfiguration());
-                rm.abortWriteTransaction(HBaseHCatStorageHandler.getWriteTransaction(jobContext.getConfiguration()));
+                rm = HBaseRevisionManagerUtil
+                        .getOpenedRevisionManager(jobContext.getConfiguration());
+                Transaction writeTransaction = HBaseRevisionManagerUtil
+                        .getWriteTransaction(jobContext.getConfiguration());
+                rm.abortWriteTransaction(writeTransaction);
             } finally {
-                if(rm != null)
+                if (rm != null)
                     rm.close();
             }
         }
@@ -124,11 +155,12 @@ class HBaseDirectOutputFormat extends Ou
         public void commitJob(JobContext jobContext) throws IOException {
             RevisionManager rm = null;
             try {
-                baseOutputCommitter.commitJob(jobContext);
-                rm = HBaseHCatStorageHandler.getOpenedRevisionManager(jobContext.getConfiguration());
-                rm.commitWriteTransaction(HBaseHCatStorageHandler.getWriteTransaction(jobContext.getConfiguration()));
+                rm = HBaseRevisionManagerUtil
+                        .getOpenedRevisionManager(jobContext.getConfiguration());
+                rm.commitWriteTransaction(HBaseRevisionManagerUtil.getWriteTransaction(jobContext
+                        .getConfiguration()));
             } finally {
-                if(rm != null)
+                if (rm != null)
                     rm.close();
             }
         }

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java?rev=1296568&r1=1296567&r2=1296568&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java Sat Mar  3 02:56:05 2012
@@ -19,22 +19,24 @@
 package org.apache.hcatalog.hbase;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
+import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hive.hbase.HBaseSerDe;
@@ -50,13 +52,11 @@ import org.apache.hadoop.hive.serde2.Ser
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hcatalog.common.HCatConstants;
 import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.schema.HCatSchema;
 import org.apache.hcatalog.hbase.snapshot.RevisionManager;
-import org.apache.hcatalog.hbase.snapshot.RevisionManagerFactory;
-import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
 import org.apache.hcatalog.hbase.snapshot.Transaction;
 import org.apache.hcatalog.hbase.snapshot.ZKBasedRevisionManager;
 import org.apache.hcatalog.mapreduce.HCatOutputFormat;
@@ -74,22 +74,93 @@ import com.facebook.fb303.FacebookBase;
  * tables through HCatalog. The implementation is very similar to the
  * HiveHBaseStorageHandler, with more details to suit HCatalog.
  */
-public class HBaseHCatStorageHandler extends HCatStorageHandler implements HiveMetaHook {
+//TODO remove serializable when HCATALOG-282 is fixed
+public class HBaseHCatStorageHandler extends HCatStorageHandler implements HiveMetaHook, Serializable {
 
-    final static public String DEFAULT_PREFIX = "default.";
+    public final static String DEFAULT_PREFIX = "default.";
+    private final static String PROPERTY_INT_OUTPUT_LOCATION = "hcat.hbase.mapreduce.intermediateOutputLocation";
 
-    private Configuration      hbaseConf;
-
-    private HBaseAdmin         admin;
+    private transient Configuration      hbaseConf;
+    private transient HBaseAdmin         admin;
 
     @Override
     public void configureInputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
-        //TODO complete rework and fill this in
+        // Populate jobProperties with input table name, table columns, RM snapshot,
+        // hbase-default.xml and hbase-site.xml
+        Map<String, String> tableJobProperties = tableDesc.getJobProperties();
+        String jobString = tableJobProperties.get(HCatConstants.HCAT_KEY_JOB_INFO);
+        try {
+            InputJobInfo inputJobInfo = (InputJobInfo) HCatUtil.deserialize(jobString);
+            HCatTableInfo tableInfo = inputJobInfo.getTableInfo();
+            String qualifiedTableName = HBaseHCatStorageHandler.getFullyQualifiedName(tableInfo);
+            jobProperties.put(TableInputFormat.INPUT_TABLE, qualifiedTableName);
+
+            Configuration jobConf = getConf();
+            String outputSchema = jobConf.get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA);
+            jobProperties.put(TableInputFormat.SCAN_COLUMNS, getScanColumns(tableInfo, outputSchema));
+
+            String serSnapshot = (String) inputJobInfo.getProperties().get(
+                    HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY);
+            if (serSnapshot == null) {
+                Configuration conf = addHbaseResources(jobConf);
+                HCatTableSnapshot snapshot = HBaseRevisionManagerUtil.createSnapshot(conf,
+                        qualifiedTableName, tableInfo);
+                jobProperties.put(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY,
+                        HCatUtil.serialize(snapshot));
+            }
+
+            addHbaseResources(jobConf, jobProperties);
+
+        } catch (IOException e) {
+            throw new IllegalStateException("Error while configuring job properties", e);
+        }
     }
 
     @Override
     public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
-        //TODO complete rework and fill this in
+        // Populate jobProperties with output table name, hbase-default.xml, hbase-site.xml, OutputJobInfo
+        // Populate RM transaction in OutputJobInfo
+        // In case of bulk mode, populate intermediate output location
+        Map<String, String> tableJobProperties = tableDesc.getJobProperties();
+        String jobString = tableJobProperties.get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
+        try {
+            OutputJobInfo outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(jobString);
+            HCatTableInfo tableInfo = outputJobInfo.getTableInfo();
+            String qualifiedTableName = HBaseHCatStorageHandler.getFullyQualifiedName(tableInfo);
+            jobProperties.put(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, qualifiedTableName);
+
+            Configuration jobConf = getConf();
+            String txnString = outputJobInfo.getProperties().getProperty(
+                    HBaseConstants.PROPERTY_WRITE_TXN_KEY);
+            if (txnString == null) {
+                Configuration conf = addHbaseResources(jobConf);
+                Transaction txn = HBaseRevisionManagerUtil.beginWriteTransaction(qualifiedTableName, tableInfo, conf);
+                outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY,
+                        HCatUtil.serialize(txn));
+
+                if (isBulkMode(outputJobInfo) && !(outputJobInfo.getProperties()
+                                .containsKey(PROPERTY_INT_OUTPUT_LOCATION))) {
+                    String tableLocation = tableInfo.getTableLocation();
+                    String location = new Path(tableLocation, "REVISION_" + txn.getRevisionNumber())
+                            .toString();
+                    outputJobInfo.getProperties().setProperty(PROPERTY_INT_OUTPUT_LOCATION,
+                            location);
+                    // We are writing out an intermediate sequenceFile hence
+                    // location is not passed in OutputJobInfo.getLocation()
+                    // TODO replace this with a mapreduce constant when available
+                    jobProperties.put("mapred.output.dir", location);
+                }
+            }
+
+            jobProperties
+                    .put(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo));
+            addHbaseResources(jobConf, jobProperties);
+            addOutputDependencyJars(jobConf);
+            jobProperties.put("tmpjars", jobConf.get("tmpjars"));
+
+        } catch (IOException e) {
+            throw new IllegalStateException("Error while configuring job properties", e);
+        }
     }
 
     /*
@@ -231,7 +302,7 @@ public class HBaseHCatStorageHandler ext
             new HTable(hbaseConf, tableDesc.getName());
 
             //Set up znodes in revision manager.
-            RevisionManager rm = getOpenedRevisionManager(hbaseConf);
+            RevisionManager rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(hbaseConf);
             if (rm instanceof ZKBasedRevisionManager) {
                 ZKBasedRevisionManager zkRM = (ZKBasedRevisionManager) rm;
                 zkRM.setUpZNodes(tableName, new ArrayList<String>(
@@ -295,36 +366,6 @@ public class HBaseHCatStorageHandler ext
         return this;
     }
 
-//TODO finish rework remove this
-//    /*
-//     * @param tableDesc
-//     *
-//     * @param jobProperties
-//     *
-//     * @see org.apache.hcatalog.storagehandler.HCatStorageHandler
-//     * #configureTableJobProperties(org.apache.hadoop.hive.ql.plan.TableDesc,
-//     * java.util.Map)
-//     */
-//    @Override
-//    public void configureTableJobProperties(TableDesc tableDesc,
-//            Map<String, String> jobProperties) {
-//        Properties tableProperties = tableDesc.getProperties();
-//
-//        jobProperties.put(HBaseSerDe.HBASE_COLUMNS_MAPPING,
-//                tableProperties.getProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING));
-//
-//        String tableName = tableProperties
-//                .getProperty(HBaseSerDe.HBASE_TABLE_NAME);
-//        if (tableName == null) {
-//            tableName = tableProperties.getProperty(Constants.META_TABLE_NAME);
-//            if (tableName.startsWith(DEFAULT_PREFIX)) {
-//                tableName = tableName.substring(DEFAULT_PREFIX.length());
-//            }
-//        }
-//        jobProperties.put(HBaseSerDe.HBASE_TABLE_NAME, tableName);
-//
-//    }
-
     private HBaseAdmin getHBaseAdmin() throws MetaException {
         try {
             if (admin == null) {
@@ -356,14 +397,12 @@ public class HBaseHCatStorageHandler ext
 
     @Override
     public Class<? extends InputFormat> getInputFormatClass() {
-        //TODO replace this with rework
-        return InputFormat.class;
+        return HBaseInputFormat.class;
     }
 
     @Override
     public Class<? extends OutputFormat> getOutputFormatClass() {
-        //TODO replace this with rework
-        return SequenceFileOutputFormat.class;
+        return HBaseBaseOutputFormat.class;
     }
 
     /*
@@ -397,6 +436,7 @@ public class HBaseHCatStorageHandler ext
     private void checkDeleteTable(Table table) throws MetaException {
         boolean isExternal = MetaStoreUtils.isExternalTable(table);
         String tableName = getHBaseTableName(table);
+        RevisionManager rm = null;
         try {
             if (!isExternal && getHBaseAdmin().tableExists(tableName)) {
                 // we have created an HBase table, so we delete it to roll back;
@@ -406,7 +446,7 @@ public class HBaseHCatStorageHandler ext
                 getHBaseAdmin().deleteTable(tableName);
 
               //Set up znodes in revision manager.
-                RevisionManager rm = getOpenedRevisionManager(hbaseConf);
+                rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(hbaseConf);
                 if (rm instanceof ZKBasedRevisionManager) {
                     ZKBasedRevisionManager zkRM = (ZKBasedRevisionManager) rm;
                     zkRM.deleteZNodes(tableName);
@@ -414,6 +454,8 @@ public class HBaseHCatStorageHandler ext
             }
         } catch (IOException ie) {
             throw new MetaException(StringUtils.stringifyException(ie));
+        } finally {
+            HBaseRevisionManagerUtil.closeRevisionManagerQuietly(rm);
         }
     }
 
@@ -436,9 +478,7 @@ public class HBaseHCatStorageHandler ext
      * @param conf
      * @throws IOException
      */
-    public static void addDependencyJars(Configuration conf) throws IOException {
-        //TODO provide a facility/interface for loading/specifying dependencies
-        //Ideally this method shouldn't be exposed to the user
+    private void addOutputDependencyJars(Configuration conf) throws IOException {
         TableMapReduceUtil.addDependencyJars(conf,
                 //hadoop-core
                 Writable.class,
@@ -452,8 +492,6 @@ public class HBaseHCatStorageHandler ext
                 HCatOutputFormat.class,
                 //hive hbase storage handler jar
                 HBaseSerDe.class,
-                //hcat hbase storage driver jar
-                HBaseOutputStorageDriver.class,
                 //hive jar
                 Table.class,
                 //libthrift jar
@@ -464,152 +502,86 @@ public class HBaseHCatStorageHandler ext
                 FacebookBase.class);
     }
 
-
     /**
-     * Creates the latest snapshot of the table.
-     *
-     * @param jobConf The job configuration.
-     * @param hbaseTableName The fully qualified name of the HBase table.
-     * @return An instance of HCatTableSnapshot
-     * @throws IOException Signals that an I/O exception has occurred.
-     */
-    public static HCatTableSnapshot createSnapshot(Configuration jobConf,
-            String hbaseTableName ) throws IOException {
-
-        RevisionManager rm = null;
-        TableSnapshot snpt;
-        try {
-            rm = getOpenedRevisionManager(jobConf);
-            snpt = rm.createSnapshot(hbaseTableName);
-        } finally {
-            if (rm != null)
-                rm.close();
-        }
-
-        String inputJobString = jobConf.get(HCatConstants.HCAT_KEY_JOB_INFO);
-        if(inputJobString == null){
-            throw new IOException(
-                    "InputJobInfo information not found in JobContext. "
-                            + "HCatInputFormat.setInput() not called?");
-        }
-        InputJobInfo inputInfo = (InputJobInfo) HCatUtil.deserialize(inputJobString);
-        HCatTableSnapshot hcatSnapshot = HBaseInputStorageDriver
-                .convertSnapshot(snpt, inputInfo.getTableInfo());
-
-        return hcatSnapshot;
+     * Utility method to get a new Configuration with hbase-default.xml and hbase-site.xml added
+     * @param jobConf existing configuration
+     * @return a new Configuration with hbase-default.xml and hbase-site.xml added
+     */
+    private Configuration addHbaseResources(Configuration jobConf) {
+        Configuration conf = new Configuration(jobConf);
+        HBaseConfiguration.addHbaseResources(conf);
+        return conf;
     }
 
     /**
-     * Creates the snapshot using the revision specified by the user.
-     *
-     * @param jobConf The job configuration.
-     * @param tableName The fully qualified name of the table whose snapshot is being taken.
-     * @param revision The revision number to use for the snapshot.
-     * @return An instance of HCatTableSnapshot.
-     * @throws IOException Signals that an I/O exception has occurred.
-     */
-    public static HCatTableSnapshot createSnapshot(Configuration jobConf,
-            String tableName, long revision)
-            throws IOException {
-
-        TableSnapshot snpt;
-        RevisionManager rm = null;
-        try {
-            rm = getOpenedRevisionManager(jobConf);
-            snpt = rm.createSnapshot(tableName, revision);
-        } finally {
-            if (rm != null)
-                rm.close();
-        }
-
-        String inputJobString = jobConf.get(HCatConstants.HCAT_KEY_JOB_INFO);
-        if(inputJobString == null){
-            throw new IOException(
-                    "InputJobInfo information not found in JobContext. "
-                            + "HCatInputFormat.setInput() not called?");
+     * Utility method to add hbase-default.xml and hbase-site.xml properties to a new map
+     * if they are not already present in the jobConf.
+     * @param jobConf Job configuration
+     * @param newJobProperties  Map to which new properties should be added
+     */
+    private void addHbaseResources(Configuration jobConf,
+            Map<String, String> newJobProperties) {
+        Configuration conf = new Configuration(false);
+        HBaseConfiguration.addHbaseResources(conf);
+        for (Entry<String, String> entry : conf) {
+            if (jobConf.get(entry.getKey()) == null)
+                newJobProperties.put(entry.getKey(), entry.getValue());
         }
-        InputJobInfo inputInfo = (InputJobInfo) HCatUtil.deserialize(inputJobString);
-        HCatTableSnapshot hcatSnapshot = HBaseInputStorageDriver
-                .convertSnapshot(snpt, inputInfo.getTableInfo());
-
-        return hcatSnapshot;
     }
 
-    /**
-     * Gets an instance of revision manager which is opened.
-     *
-     * @param jobConf The job configuration.
-     * @return RevisionManager An instance of revision manager.
-     * @throws IOException
-     */
-    static RevisionManager getOpenedRevisionManager(Configuration jobConf) throws IOException {
-
-        Properties properties = new Properties();
-        String zkHostList = jobConf.get(HConstants.ZOOKEEPER_QUORUM);
-        int port = jobConf.getInt("hbase.zookeeper.property.clientPort",
-                HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
-
-        if (zkHostList != null) {
-            String[] splits = zkHostList.split(",");
-            StringBuffer sb = new StringBuffer();
-            for (String split : splits) {
-                sb.append(split);
-                sb.append(':');
-                sb.append(port);
-                sb.append(',');
-            }
-
-            sb.deleteCharAt(sb.length() - 1);
-            properties.put(ZKBasedRevisionManager.HOSTLIST, sb.toString());
-        }
-        String dataDir = jobConf.get(ZKBasedRevisionManager.DATADIR);
-        if (dataDir != null) {
-            properties.put(ZKBasedRevisionManager.DATADIR, dataDir);
-        }
-        String rmClassName = jobConf.get(
-                RevisionManager.REVISION_MGR_IMPL_CLASS,
-                ZKBasedRevisionManager.class.getName());
-        properties.put(RevisionManager.REVISION_MGR_IMPL_CLASS, rmClassName);
-        RevisionManager revisionManger = RevisionManagerFactory
-                .getRevisionManager(properties);
-        revisionManger.open();
-        return revisionManger;
-    }
-
-    /**
-     * Set snapshot as a property.
-     *
-     * @param snapshot The HCatTableSnapshot to be passed to the job.
-     * @param inpJobInfo The InputJobInfo for the job.
-     * @throws IOException
-     */
-    public void setSnapshot(HCatTableSnapshot snapshot, InputJobInfo inpJobInfo)
-            throws IOException {
-        String serializedSnp = HCatUtil.serialize(snapshot);
-        inpJobInfo.getProperties().setProperty(
-                HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY, serializedSnp);
-    }
-
-    static Transaction getWriteTransaction(Configuration conf) throws IOException {
-        OutputJobInfo outputJobInfo = (OutputJobInfo)HCatUtil.deserialize(conf.get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
-        return (Transaction) HCatUtil.deserialize(outputJobInfo.getProperties()
-                                                               .getProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY));
-    }
-
-    static void setWriteTransaction(Configuration conf, Transaction txn) throws IOException {
-        OutputJobInfo outputJobInfo = (OutputJobInfo)HCatUtil.deserialize(conf.get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
-        outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY, HCatUtil.serialize(txn));
-        conf.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo));
-    }
-
-    /**
-     * Get the Revision number that will be assigned to this job's output data
-     * @param conf configuration of the job
-     * @return the revision number used
-     * @throws IOException
-     */
-    public static long getOutputRevision(Configuration conf) throws IOException {
-        return getWriteTransaction(conf).getRevisionNumber();
+    public static boolean isBulkMode(OutputJobInfo outputJobInfo) {
+        //Default is false
+        String bulkMode = outputJobInfo.getTableInfo().getStorerInfo().getProperties()
+                .getProperty(HBaseConstants.PROPERTY_OSD_BULK_MODE_KEY,
+                        "false");
+        return "true".equals(bulkMode);
+    }
+
+    private String getScanColumns(HCatTableInfo tableInfo, String outputColSchema) throws IOException {
+        StringBuilder builder = new StringBuilder();
+        String hbaseColumnMapping = tableInfo.getStorerInfo().getProperties()
+                .getProperty(HBaseConstants.PROPERTY_COLUMN_MAPPING_KEY);
+        if (outputColSchema == null) {
+            String[] splits = hbaseColumnMapping.split("[,]");
+            for (int i = 0; i < splits.length; i++) {
+                if (!splits[i].equals(HBaseSerDe.HBASE_KEY_COL))
+                    builder.append(splits[i]).append(" ");
+            }
+        } else {
+            HCatSchema outputSchema = (HCatSchema) HCatUtil.deserialize(outputColSchema);
+            HCatSchema tableSchema = tableInfo.getDataColumns();
+            List<String> outputFieldNames = outputSchema.getFieldNames();
+            List<Integer> outputColumnMapping = new ArrayList<Integer>();
+            for(String fieldName: outputFieldNames){
+                int position = tableSchema.getPosition(fieldName);
+                outputColumnMapping.add(position);
+            }
+            try {
+                List<String> columnFamilies = new ArrayList<String>();
+                List<String> columnQualifiers = new ArrayList<String>();
+                HBaseSerDe.parseColumnMapping(hbaseColumnMapping, columnFamilies, null,
+                        columnQualifiers, null);
+                for (int i = 0; i < outputColumnMapping.size(); i++) {
+                    int cfIndex = outputColumnMapping.get(i);
+                    String cf = columnFamilies.get(cfIndex);
+                    // We skip the key column.
+                    if (cf.equals(HBaseSerDe.HBASE_KEY_COL) == false) {
+                        String qualifier = columnQualifiers.get(i);
+                        builder.append(cf);
+                        builder.append(":");
+                        if (qualifier != null) {
+                            builder.append(qualifier);
+                        }
+                        builder.append(" ");
+                    }
+                }
+            } catch (SerDeException e) {
+                throw new IOException(e);
+            }
+        }
+        //Remove the extra space delimiter
+        builder.deleteCharAt(builder.length() - 1);
+        return builder.toString();
     }
 
 }

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java?rev=1296568&r1=1296567&r2=1296568&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java Sat Mar  3 02:56:05 2012
@@ -21,33 +21,31 @@ package org.apache.hcatalog.hbase;
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapred.TableSplit;
 import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
-import org.apache.hadoop.hbase.mapreduce.TableSplit;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapred.HCatMapRedUtil;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
 import org.apache.hcatalog.mapreduce.InputJobInfo;
 
 /**
  * This class HBaseInputFormat is a wrapper class of TableInputFormat in HBase.
  */
-class HBaseInputFormat extends InputFormat<ImmutableBytesWritable, Result> implements Configurable{
+class HBaseInputFormat implements InputFormat<ImmutableBytesWritable, Result> {
 
     private final TableInputFormat inputFormat;
-    private final InputJobInfo jobInfo;
-    private Configuration conf;
 
-    public HBaseInputFormat(InputJobInfo jobInfo) {
+    public HBaseInputFormat() {
         inputFormat = new TableInputFormat();
-        this.jobInfo = jobInfo;
     }
 
     /*
@@ -67,20 +65,27 @@ class HBaseInputFormat extends InputForm
      * org.apache.hadoop.mapreduce.TaskAttemptContext)
      */
     @Override
-    public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
-            InputSplit split, TaskAttemptContext tac) throws IOException,
-            InterruptedException {
-
-          String tableName = inputFormat.getConf().get(TableInputFormat.INPUT_TABLE);
-          TableSplit tSplit = (TableSplit) split;
-          HbaseSnapshotRecordReader recordReader = new HbaseSnapshotRecordReader(jobInfo);
-          Scan sc = new Scan(inputFormat.getScan());
-          sc.setStartRow(tSplit.getStartRow());
-          sc.setStopRow(tSplit.getEndRow());
-          recordReader.setScan(sc);
-          recordReader.setHTable(new HTable(this.conf, tableName));
-          recordReader.init();
-          return recordReader;
+    public RecordReader<ImmutableBytesWritable, Result> getRecordReader(
+            InputSplit split, JobConf job, Reporter reporter)
+            throws IOException {
+        String jobString = job.get(HCatConstants.HCAT_KEY_JOB_INFO);
+        InputJobInfo inputJobInfo = (InputJobInfo) HCatUtil.deserialize(jobString);
+
+        String tableName = job.get(TableInputFormat.INPUT_TABLE);
+        TableSplit tSplit = (TableSplit) split;
+        HbaseSnapshotRecordReader recordReader = new HbaseSnapshotRecordReader(inputJobInfo, job);
+        inputFormat.setConf(job);
+        Scan inputScan = inputFormat.getScan();
+        // TODO: Make the caching configurable by the user
+        inputScan.setCaching(200);
+        inputScan.setCacheBlocks(false);
+        Scan sc = new Scan(inputScan);
+        sc.setStartRow(tSplit.getStartRow());
+        sc.setStopRow(tSplit.getEndRow());
+        recordReader.setScan(sc);
+        recordReader.setHTable(new HTable(job, tableName));
+        recordReader.init();
+        return recordReader;
     }
 
     /*
@@ -97,35 +102,24 @@ class HBaseInputFormat extends InputForm
      * .JobContext)
      */
     @Override
-    public List<InputSplit> getSplits(JobContext jobContext)
-            throws IOException, InterruptedException {
-
-        String tableName = this.conf.get(TableInputFormat.INPUT_TABLE);
-        if (tableName == null) {
-           throw new IOException("The input table is not set. The input splits cannot be created.");
+    public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job, int numSplits)
+            throws IOException {
+        inputFormat.setConf(job);
+        return convertSplits(inputFormat.getSplits(HCatMapRedUtil.createJobContext(job, null,
+                Reporter.NULL)));
+    }
+
+    private InputSplit[] convertSplits(List<org.apache.hadoop.mapreduce.InputSplit> splits) {
+        InputSplit[] converted = new InputSplit[splits.size()];
+        for (int i = 0; i < splits.size(); i++) {
+            org.apache.hadoop.hbase.mapreduce.TableSplit tableSplit =
+                    (org.apache.hadoop.hbase.mapreduce.TableSplit) splits.get(i);
+            TableSplit newTableSplit = new TableSplit(tableSplit.getTableName(),
+                    tableSplit.getStartRow(),
+                    tableSplit.getEndRow(), tableSplit.getRegionLocation());
+            converted[i] = newTableSplit;
         }
-        return inputFormat.getSplits(jobContext);
-    }
-
-    public void setConf(Configuration conf) {
-        this.conf = conf;
-        inputFormat.setConf(conf);
-    }
-
-    public Scan getScan() {
-        return inputFormat.getScan();
-    }
-
-    public void setScan(Scan scan) {
-        inputFormat.setScan(scan);
-    }
-
-    /* @return
-     * @see org.apache.hadoop.conf.Configurable#getConf()
-     */
-    @Override
-    public Configuration getConf() {
-       return this.conf;
+        return converted;
     }
 
 }

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRevisionManagerUtil.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRevisionManagerUtil.java?rev=1296568&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRevisionManagerUtil.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRevisionManagerUtil.java Sat Mar  3 02:56:05 2012
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.hbase;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hive.hbase.HBaseSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.hbase.snapshot.RevisionManager;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerFactory;
+import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
+import org.apache.hcatalog.hbase.snapshot.Transaction;
+import org.apache.hcatalog.hbase.snapshot.ZKBasedRevisionManager;
+import org.apache.hcatalog.mapreduce.HCatTableInfo;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+import org.apache.hcatalog.mapreduce.StorerInfo;
+
+
+/**
+ * The Class HBaseRevisionManagerUtil has utility methods to interact with Revision Manager
+ *
+ */
+class HBaseRevisionManagerUtil {
+
+    private final static Log LOG = LogFactory.getLog(HBaseRevisionManagerUtil.class);
+
+    private HBaseRevisionManagerUtil() {
+    }
+
+    /**
+     * Creates the latest snapshot of the table.
+     *
+     * @param jobConf The job configuration.
+     * @param hbaseTableName The fully qualified name of the HBase table.
+     * @param tableInfo HCat table information
+     * @return An instance of HCatTableSnapshot
+     * @throws IOException Signals that an I/O exception has occurred.
+     */
+    static HCatTableSnapshot createSnapshot(Configuration jobConf,
+            String hbaseTableName, HCatTableInfo tableInfo ) throws IOException {
+
+        RevisionManager rm = null;
+        TableSnapshot snpt;
+        try {
+            rm = getOpenedRevisionManager(jobConf);
+            snpt = rm.createSnapshot(hbaseTableName);
+        } finally {
+            closeRevisionManagerQuietly(rm);
+        }
+
+        HCatTableSnapshot hcatSnapshot = HBaseRevisionManagerUtil.convertSnapshot(snpt, tableInfo);
+        return hcatSnapshot;
+    }
+
+    /**
+     * Creates the snapshot using the revision specified by the user.
+     *
+     * @param jobConf The job configuration.
+     * @param tableName The fully qualified name of the table whose snapshot is being taken.
+     * @param revision The revision number to use for the snapshot.
+     * @return An instance of HCatTableSnapshot.
+     * @throws IOException Signals that an I/O exception has occurred.
+     */
+    static HCatTableSnapshot createSnapshot(Configuration jobConf,
+            String tableName, long revision)
+            throws IOException {
+
+        TableSnapshot snpt;
+        RevisionManager rm = null;
+        try {
+            rm = getOpenedRevisionManager(jobConf);
+            snpt = rm.createSnapshot(tableName, revision);
+        } finally {
+            closeRevisionManagerQuietly(rm);
+        }
+
+        String inputJobString = jobConf.get(HCatConstants.HCAT_KEY_JOB_INFO);
+        if(inputJobString == null){
+            throw new IOException(
+                    "InputJobInfo information not found in JobContext. "
+                            + "HCatInputFormat.setInput() not called?");
+        }
+        InputJobInfo inputInfo = (InputJobInfo) HCatUtil.deserialize(inputJobString);
+        HCatTableSnapshot hcatSnapshot = HBaseRevisionManagerUtil
+                .convertSnapshot(snpt, inputInfo.getTableInfo());
+
+        return hcatSnapshot;
+    }
+
+    /**
+     * Gets an instance of revision manager which is opened.
+     *
+     * @param jobConf The job configuration.
+     * @return RevisionManager An instance of revision manager.
+     * @throws IOException
+     */
+    static RevisionManager getOpenedRevisionManager(Configuration jobConf) throws IOException {
+
+        Properties properties = new Properties();
+        String zkHostList = jobConf.get(HConstants.ZOOKEEPER_QUORUM);
+        int port = jobConf.getInt("hbase.zookeeper.property.clientPort",
+                HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
+
+        if (zkHostList != null) {
+            String[] splits = zkHostList.split(",");
+            StringBuffer sb = new StringBuffer();
+            for (String split : splits) {
+                sb.append(split);
+                sb.append(':');
+                sb.append(port);
+                sb.append(',');
+            }
+
+            sb.deleteCharAt(sb.length() - 1);
+            properties.put(ZKBasedRevisionManager.HOSTLIST, sb.toString());
+        }
+        String dataDir = jobConf.get(ZKBasedRevisionManager.DATADIR);
+        if (dataDir != null) {
+            properties.put(ZKBasedRevisionManager.DATADIR, dataDir);
+        }
+        String rmClassName = jobConf.get(
+                RevisionManager.REVISION_MGR_IMPL_CLASS,
+                ZKBasedRevisionManager.class.getName());
+        properties.put(RevisionManager.REVISION_MGR_IMPL_CLASS, rmClassName);
+        RevisionManager revisionManger = RevisionManagerFactory
+                .getRevisionManager(properties);
+        revisionManger.open();
+        return revisionManger;
+    }
+
+    static void closeRevisionManagerQuietly(RevisionManager rm) {
+        if (rm != null) {
+            try {
+                rm.close();
+            } catch (IOException e) {
+                LOG.warn("Error while trying to close revision manager", e);
+            }
+        }
+    }
+
+
+    static HCatTableSnapshot convertSnapshot(TableSnapshot hbaseSnapshot,
+            HCatTableInfo hcatTableInfo) throws IOException {
+
+        HCatSchema hcatTableSchema = hcatTableInfo.getDataColumns();
+        Map<String, String> hcatHbaseColMap = getHCatHBaseColumnMapping(hcatTableInfo);
+        HashMap<String, Long> revisionMap = new HashMap<String, Long>();
+
+        for (HCatFieldSchema fSchema : hcatTableSchema.getFields()) {
+            if(hcatHbaseColMap.containsKey(fSchema.getName())){
+                String colFamily = hcatHbaseColMap.get(fSchema.getName());
+                long revisionID = hbaseSnapshot.getRevision(colFamily);
+                revisionMap.put(fSchema.getName(), revisionID);
+            }
+        }
+
+        HCatTableSnapshot hcatSnapshot = new HCatTableSnapshot(
+                 hcatTableInfo.getDatabaseName(), hcatTableInfo.getTableName(),revisionMap,hbaseSnapshot.getLatestRevision());
+        return hcatSnapshot;
+    }
+
+    static TableSnapshot convertSnapshot(HCatTableSnapshot hcatSnapshot,
+            HCatTableInfo hcatTableInfo) throws IOException {
+
+        HCatSchema hcatTableSchema = hcatTableInfo.getDataColumns();
+        Map<String, Long> revisionMap = new HashMap<String, Long>();
+        Map<String, String> hcatHbaseColMap = getHCatHBaseColumnMapping(hcatTableInfo);
+        for (HCatFieldSchema fSchema : hcatTableSchema.getFields()) {
+            String colFamily = hcatHbaseColMap.get(fSchema.getName());
+            if (hcatSnapshot.containsColumn(fSchema.getName())) {
+                long revision = hcatSnapshot.getRevision(fSchema.getName());
+                revisionMap.put(colFamily, revision);
+            }
+        }
+
+        String fullyQualifiedName = hcatSnapshot.getDatabaseName() + "."
+                + hcatSnapshot.getTableName();
+        return new TableSnapshot(fullyQualifiedName, revisionMap,hcatSnapshot.getLatestRevision());
+
+    }
+
+    /**
+     * Begins a transaction in the revision manager for the given table.
+     * @param qualifiedTableName Name of the table
+     * @param tableInfo HCat Table information
+     * @param jobConf Job Configuration
+     * @return The new transaction in revision manager
+     * @throws IOException
+     */
+    static Transaction beginWriteTransaction(String qualifiedTableName,
+            HCatTableInfo tableInfo, Configuration jobConf) throws IOException {
+        Transaction txn;
+        RevisionManager rm = null;
+        try {
+            rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(jobConf);
+            String hBaseColumns = tableInfo.getStorerInfo().getProperties()
+                    .getProperty(HBaseConstants.PROPERTY_COLUMN_MAPPING_KEY);
+            String[] splits = hBaseColumns.split("[,:]");
+            Set<String> families = new HashSet<String>();
+            for (int i = 0; i < splits.length; i += 2) {
+                if (!splits[i].isEmpty())
+                    families.add(splits[i]);
+            }
+            txn = rm.beginWriteTransaction(qualifiedTableName, new ArrayList<String>(families));
+        } finally {
+            HBaseRevisionManagerUtil.closeRevisionManagerQuietly(rm);
+        }
+        return txn;
+    }
+
+    static Transaction getWriteTransaction(Configuration conf) throws IOException {
+        OutputJobInfo outputJobInfo = (OutputJobInfo)HCatUtil.deserialize(conf.get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
+        return (Transaction) HCatUtil.deserialize(outputJobInfo.getProperties()
+                                                               .getProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY));
+    }
+
+    static void setWriteTransaction(Configuration conf, Transaction txn) throws IOException {
+        OutputJobInfo outputJobInfo = (OutputJobInfo)HCatUtil.deserialize(conf.get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
+        outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY, HCatUtil.serialize(txn));
+        conf.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo));
+    }
+
+    /**
+     * Get the Revision number that will be assigned to this job's output data
+     * @param conf configuration of the job
+     * @return the revision number used
+     * @throws IOException
+     */
+    static long getOutputRevision(Configuration conf) throws IOException {
+        return getWriteTransaction(conf).getRevisionNumber();
+    }
+
+    private static Map<String, String> getHCatHBaseColumnMapping( HCatTableInfo hcatTableInfo)
+            throws IOException {
+
+        HCatSchema hcatTableSchema = hcatTableInfo.getDataColumns();
+        StorerInfo storeInfo = hcatTableInfo.getStorerInfo();
+        String hbaseColumnMapping = storeInfo.getProperties().getProperty(
+                HBaseConstants.PROPERTY_COLUMN_MAPPING_KEY);
+
+        Map<String, String> hcatHbaseColMap = new HashMap<String, String>();
+        List<String> columnFamilies = new ArrayList<String>();
+        List<String> columnQualifiers = new ArrayList<String>();
+        try {
+            HBaseSerDe.parseColumnMapping(hbaseColumnMapping, columnFamilies,
+                    null, columnQualifiers, null);
+        } catch (SerDeException e) {
+            throw new IOException("Exception while converting snapshots.", e);
+        }
+
+        for (HCatFieldSchema column : hcatTableSchema.getFields()) {
+            int fieldPos = hcatTableSchema.getPosition(column.getName());
+            String colFamily = columnFamilies.get(fieldPos);
+            if (colFamily.equals(HBaseSerDe.HBASE_KEY_COL) == false) {
+                hcatHbaseColMap.put(column.getName(), colFamily);
+            }
+        }
+
+        return hcatHbaseColMap;
+    }
+
+}