You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by to...@apache.org on 2012/03/03 03:56:14 UTC

svn commit: r1296572 - in /incubator/hcatalog/branches/branch-0.4: ./ src/java/org/apache/hcatalog/mapreduce/ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ storage-drivers/h...

Author: toffer
Date: Sat Mar  3 03:56:13 2012
New Revision: 1296572

URL: http://svn.apache.org/viewvc?rev=1296572&view=rev
Log:
HCAT-252 Rework HBase storage driver into HBase storage handler (rohini via toffer)

Added:
    incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java
      - copied unchanged from r1296568, incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java
    incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRevisionManagerUtil.java
      - copied unchanged from r1296568, incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRevisionManagerUtil.java
    incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java
      - copied unchanged from r1296568, incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java
    incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java
      - copied unchanged from r1296568, incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java
    incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java
      - copied unchanged from r1296568, incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java
    incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java
      - copied unchanged from r1296568, incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java
Removed:
    incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java
    incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java
    incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java
    incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputStorageDriver.java
    incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseOutputStorageDriver.java
    incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java
    incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java.broken
    incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java.broken
    incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java.broken
    incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseSerDeResultConverter.java
    incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java.broken
Modified:
    incubator/hcatalog/branches/branch-0.4/   (props changed)
    incubator/hcatalog/branches/branch-0.4/CHANGES.txt
    incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java
    incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java
    incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/DefaultRecordWriterContainer.java
    incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java
    incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java
    incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java
    incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java
    incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java
    incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/FamilyRevision.java
    incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java
    incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java

Propchange: incubator/hcatalog/branches/branch-0.4/
------------------------------------------------------------------------------
    svn:mergeinfo = /incubator/hcatalog/trunk:1296568

Modified: incubator/hcatalog/branches/branch-0.4/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/CHANGES.txt?rev=1296572&r1=1296571&r2=1296572&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/CHANGES.txt (original)
+++ incubator/hcatalog/branches/branch-0.4/CHANGES.txt Sat Mar  3 03:56:13 2012
@@ -21,6 +21,8 @@ Apache HCatalog Change Log
 Release 0.4.0 - Unreleased
 
   INCOMPATIBLE CHANGES
+  HCAT-252 Rework HBase storage driver into HBase storage handler (rohini via toffer) 
+
   HCAT-265 remove deprecated HCatStorageHandler (toffer)
 
   HCAT-239. Changes to HCatInputFormat to make it use SerDes instead of StorageDrivers (vikram.dixit via gates)

Modified: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java?rev=1296572&r1=1296571&r2=1296572&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java Sat Mar  3 03:56:13 2012
@@ -20,11 +20,12 @@ package org.apache.hcatalog.mapreduce;
 
 import java.io.IOException;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.mapred.HCatMapRedUtil;
 import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.JobStatus.State;
 import org.apache.hcatalog.common.HCatConstants;
@@ -36,6 +37,8 @@ import org.apache.hcatalog.common.HCatUt
  */
 class DefaultOutputCommitterContainer extends OutputCommitterContainer {
 
+    private static final Log LOG = LogFactory.getLog(DefaultOutputCommitterContainer.class);
+
     /**
      * @param context current JobContext
      * @param baseCommitter OutputCommitter to contain
@@ -86,11 +89,9 @@ class DefaultOutputCommitterContainer ex
     public void cleanupJob(JobContext context) throws IOException {
         getBaseOutputCommitter().cleanupJob(HCatMapRedUtil.createJobContext(context));
 
-        OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context);
-
         //Cancel HCat and JobTracker tokens
         try {
-            HiveConf hiveConf = HCatUtil.getHiveConf(null, 
+            HiveConf hiveConf = HCatUtil.getHiveConf(null,
                                                   context.getConfiguration());
             HiveMetaStoreClient client = HCatUtil.createHiveClient(hiveConf);
             String tokenStrForm = client.getTokenStrForm();
@@ -98,7 +99,7 @@ class DefaultOutputCommitterContainer ex
               client.cancelDelegationToken(tokenStrForm);
             }
         } catch (Exception e) {
-            throw new IOException("Failed to cancel delegation token",e);
+            LOG.warn("Failed to cancel delegation token", e);
         }
     }
 }

Modified: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java?rev=1296572&r1=1296571&r2=1296572&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java Sat Mar  3 03:56:13 2012
@@ -30,6 +30,7 @@ import org.apache.hcatalog.common.HCatUt
 import org.apache.hcatalog.data.HCatRecord;
 
 import java.io.IOException;
+import java.text.NumberFormat;
 
 /**
  * Bare bones implementation of OutputFormatContainer. Does only the required
@@ -38,10 +39,20 @@ import java.io.IOException;
  */
 class DefaultOutputFormatContainer extends OutputFormatContainer {
 
+    private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
+
+    static {
+      NUMBER_FORMAT.setMinimumIntegerDigits(5);
+      NUMBER_FORMAT.setGroupingUsed(false);
+    }
+
     public DefaultOutputFormatContainer(org.apache.hadoop.mapred.OutputFormat<WritableComparable<?>, Writable> of) {
         super(of);
     }
 
+    static synchronized String getOutputName(int partition) {
+        return "part-" + NUMBER_FORMAT.format(partition);
+      }
 
     /**
      * Get the record writer for the job. Uses the Table's default OutputStorageDriver
@@ -53,8 +64,9 @@ class DefaultOutputFormatContainer exten
     @Override
     public RecordWriter<WritableComparable<?>, HCatRecord>
     getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
+        String name = getOutputName(context.getTaskAttemptID().getTaskID().getId());
         return new DefaultRecordWriterContainer(context,
-                getBaseOutputFormat().getRecordWriter(null, new JobConf(context.getConfiguration()),null, InternalUtil.createReporter(context)));
+                getBaseOutputFormat().getRecordWriter(null, new JobConf(context.getConfiguration()), name, InternalUtil.createReporter(context)));
     }
 
 

Modified: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/DefaultRecordWriterContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/DefaultRecordWriterContainer.java?rev=1296572&r1=1296571&r2=1296572&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/DefaultRecordWriterContainer.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/DefaultRecordWriterContainer.java Sat Mar  3 03:56:13 2012
@@ -72,7 +72,7 @@ class DefaultRecordWriterContainer exten
     public void write(WritableComparable<?> key, HCatRecord value) throws IOException,
             InterruptedException {
         try {
-            getBaseRecordWriter().write(null, serDe.serialize(value, hcatRecordOI));
+            getBaseRecordWriter().write(null, serDe.serialize(value.getAll(), hcatRecordOI));
         } catch (SerDeException e) {
             throw new IOException("Failed to serialize object",e);
         }

Modified: incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java?rev=1296572&r1=1296571&r2=1296572&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java (original)
+++ incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java Sat Mar  3 03:56:13 2012
@@ -1,160 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hcatalog.hbase;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import java.io.IOException;
+import java.util.List;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileOutputCommitter;
+import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.util.Progressable;
 import org.apache.hcatalog.hbase.snapshot.RevisionManager;
-import org.apache.hcatalog.hbase.snapshot.RevisionManagerFactory;
-import org.apache.hcatalog.hbase.snapshot.Transaction;
-import org.apache.hcatalog.mapreduce.HCatOutputFormat;
-import org.apache.hcatalog.mapreduce.OutputJobInfo;
-
-import java.io.IOException;
 
 /**
- * Class which imports data into HBase via it's "bulk load" feature. Wherein regions
- * are created by the MR job using HFileOutputFormat and then later "moved" into
- * the appropriate region server.
+ * Class which imports data into HBase via it's "bulk load" feature. Wherein
+ * regions are created by the MR job using HFileOutputFormat and then later
+ * "moved" into the appropriate region server.
  */
-class HBaseBulkOutputFormat extends OutputFormat<WritableComparable<?>,Put> {
-    private final static ImmutableBytesWritable EMPTY_LIST = new ImmutableBytesWritable(new byte[0]);
-    private SequenceFileOutputFormat<WritableComparable<?>,Put> baseOutputFormat;
-    private final static Log LOG = LogFactory.getLog(HBaseBulkOutputFormat.class);
+class HBaseBulkOutputFormat extends HBaseBaseOutputFormat {
+
+    private final static ImmutableBytesWritable EMPTY_LIST = new ImmutableBytesWritable(
+            new byte[0]);
+    private SequenceFileOutputFormat<WritableComparable<?>, Put> baseOutputFormat;
 
     public HBaseBulkOutputFormat() {
-        baseOutputFormat = new SequenceFileOutputFormat<WritableComparable<?>,Put>();
+        baseOutputFormat = new SequenceFileOutputFormat<WritableComparable<?>, Put>();
     }
 
     @Override
-    public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
-        baseOutputFormat.checkOutputSpecs(context);
-        //Get jobTracker delegation token if security is enabled
-        //we need to launch the ImportSequenceFile job
-        if(context.getConfiguration().getBoolean("hadoop.security.authorization",false)) {
-            JobClient jobClient = new JobClient(new JobConf(context.getConfiguration()));
-            context.getCredentials().addToken(new Text("my mr token"), jobClient.getDelegationToken(null));
-        }
+    public void checkOutputSpecs(FileSystem ignored, JobConf job)
+            throws IOException {
+        job.setOutputKeyClass(ImmutableBytesWritable.class);
+        job.setOutputValueClass(Put.class);
+        job.setOutputCommitter(HBaseBulkOutputCommitter.class);
+        baseOutputFormat.checkOutputSpecs(ignored, job);
+        getJTDelegationToken(job);
     }
 
     @Override
-    public RecordWriter<WritableComparable<?>, Put> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
-        //TODO use a constant/static setter when available
-        context.getConfiguration().setClass("mapred.output.key.class",ImmutableBytesWritable.class,Object.class);
-        context.getConfiguration().setClass("mapred.output.value.class",Put.class,Object.class);
-        return new HBaseBulkRecordWriter(baseOutputFormat.getRecordWriter(context));
+    public RecordWriter<WritableComparable<?>, Put> getRecordWriter(
+            FileSystem ignored, JobConf job, String name, Progressable progress)
+            throws IOException {
+        long version = HBaseRevisionManagerUtil.getOutputRevision(job);
+        return new HBaseBulkRecordWriter(baseOutputFormat.getRecordWriter(
+                ignored, job, name, progress), version);
     }
 
-    @Override
-    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException {
-        return new HBaseBulkOutputCommitter(baseOutputFormat.getOutputCommitter(context));
+    private void getJTDelegationToken(JobConf job) throws IOException {
+        // Get jobTracker delegation token if security is enabled
+        // we need to launch the ImportSequenceFile job
+        if (job.getBoolean("hadoop.security.authorization", false)) {
+            JobClient jobClient = new JobClient(new JobConf(job));
+            try {
+                job.getCredentials().addToken(new Text("my mr token"),
+                        jobClient.getDelegationToken(null));
+            } catch (InterruptedException e) {
+                throw new IOException("Error while getting JT delegation token", e);
+            }
+        }
     }
 
-    private static class HBaseBulkRecordWriter extends  RecordWriter<WritableComparable<?>,Put> {
-        private RecordWriter<WritableComparable<?>,Put> baseWriter;
+    private static class HBaseBulkRecordWriter implements
+            RecordWriter<WritableComparable<?>, Put> {
 
-        public HBaseBulkRecordWriter(RecordWriter<WritableComparable<?>,Put> baseWriter)  {
+        private RecordWriter<WritableComparable<?>, Put> baseWriter;
+        private final Long outputVersion;
+
+        public HBaseBulkRecordWriter(
+                RecordWriter<WritableComparable<?>, Put> baseWriter,
+                Long outputVersion) {
             this.baseWriter = baseWriter;
+            this.outputVersion = outputVersion;
         }
 
         @Override
-        public void write(WritableComparable<?> key, Put value) throws IOException, InterruptedException {
-            //we ignore the key
-            baseWriter.write(EMPTY_LIST, value);
+        public void write(WritableComparable<?> key, Put value)
+                throws IOException {
+            Put put = value;
+            if (outputVersion != null) {
+                put = new Put(value.getRow(), outputVersion.longValue());
+                for (List<KeyValue> row : value.getFamilyMap().values()) {
+                    for (KeyValue el : row) {
+                        put.add(el.getFamily(), el.getQualifier(), el.getValue());
+                    }
+                }
+            }
+            // we ignore the key
+            baseWriter.write(EMPTY_LIST, put);
         }
 
         @Override
-        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
-            baseWriter.close(context);
+        public void close(Reporter reporter) throws IOException {
+            baseWriter.close(reporter);
         }
     }
 
-    private static class HBaseBulkOutputCommitter extends OutputCommitter {
-        private OutputCommitter baseOutputCommitter;
+    public static class HBaseBulkOutputCommitter extends OutputCommitter {
+
+        private final OutputCommitter baseOutputCommitter;
 
-        public HBaseBulkOutputCommitter(OutputCommitter baseOutputCommitter) throws IOException {
-            this.baseOutputCommitter = baseOutputCommitter;
+        public HBaseBulkOutputCommitter() {
+            baseOutputCommitter = new FileOutputCommitter();
         }
 
         @Override
-        public void abortTask(TaskAttemptContext context) throws IOException {
-            baseOutputCommitter.abortTask(context);
+        public void abortTask(TaskAttemptContext taskContext)
+                throws IOException {
+            baseOutputCommitter.abortTask(taskContext);
         }
 
         @Override
-        public void commitTask(TaskAttemptContext context) throws IOException {
-            baseOutputCommitter.commitTask(context);
+        public void commitTask(TaskAttemptContext taskContext)
+                throws IOException {
+            baseOutputCommitter.commitTask(taskContext);
         }
 
         @Override
-        public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
-            return baseOutputCommitter.needsTaskCommit(context);
+        public boolean needsTaskCommit(TaskAttemptContext taskContext)
+                throws IOException {
+            return baseOutputCommitter.needsTaskCommit(taskContext);
         }
 
         @Override
-        public void setupJob(JobContext context) throws IOException {
-            baseOutputCommitter.setupJob(context);
+        public void setupJob(JobContext jobContext) throws IOException {
+            baseOutputCommitter.setupJob(jobContext);
         }
 
         @Override
-        public void setupTask(TaskAttemptContext context) throws IOException {
-            baseOutputCommitter.setupTask(context);
+        public void setupTask(TaskAttemptContext taskContext)
+                throws IOException {
+            baseOutputCommitter.setupTask(taskContext);
         }
 
         @Override
-        public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
+        public void abortJob(JobContext jobContext, int status)
+                throws IOException {
+            baseOutputCommitter.abortJob(jobContext, status);
             RevisionManager rm = null;
             try {
-                baseOutputCommitter.abortJob(jobContext,state);
-                rm = HBaseHCatStorageHandler.getOpenedRevisionManager(jobContext.getConfiguration());
-                rm.abortWriteTransaction(HBaseHCatStorageHandler.getWriteTransaction(jobContext.getConfiguration()));
+                rm = HBaseRevisionManagerUtil
+                        .getOpenedRevisionManager(jobContext.getConfiguration());
+                rm.abortWriteTransaction(HBaseRevisionManagerUtil
+                        .getWriteTransaction(jobContext.getConfiguration()));
             } finally {
                 cleanIntermediate(jobContext);
-                if(rm != null)
+                if (rm != null)
                     rm.close();
             }
         }
 
         @Override
         public void commitJob(JobContext jobContext) throws IOException {
+            baseOutputCommitter.commitJob(jobContext);
             RevisionManager rm = null;
             try {
-                baseOutputCommitter.commitJob(jobContext);
                 Configuration conf = jobContext.getConfiguration();
-                Path srcPath = FileOutputFormat.getOutputPath(jobContext);
-                Path destPath = new Path(srcPath.getParent(),srcPath.getName()+"_hfiles");
+                Path srcPath = FileOutputFormat.getOutputPath(jobContext.getJobConf());
+                Path destPath = new Path(srcPath.getParent(), srcPath.getName() + "_hfiles");
                 ImportSequenceFile.runJob(jobContext,
-                                          conf.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY),
-                                          srcPath,
-                                          destPath);
-                rm = HBaseHCatStorageHandler.getOpenedRevisionManager(jobContext.getConfiguration());
-                rm.commitWriteTransaction(HBaseHCatStorageHandler.getWriteTransaction(jobContext.getConfiguration()));
+                                conf.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY),
+                                srcPath,
+                                destPath);
+                rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf);
+                rm.commitWriteTransaction(HBaseRevisionManagerUtil.getWriteTransaction(conf));
                 cleanIntermediate(jobContext);
             } finally {
-                if(rm != null)
+                if (rm != null)
                     rm.close();
             }
         }
 
-        public void cleanIntermediate(JobContext jobContext) throws IOException {
+        private void cleanIntermediate(JobContext jobContext)
+                throws IOException {
             FileSystem fs = FileSystem.get(jobContext.getConfiguration());
-            fs.delete(FileOutputFormat.getOutputPath(jobContext),true);
+            fs.delete(FileOutputFormat.getOutputPath(jobContext.getJobConf()), true);
         }
     }
 }

Modified: incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java?rev=1296572&r1=1296571&r2=1296572&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java (original)
+++ incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java Sat Mar  3 03:56:13 2012
@@ -18,104 +18,135 @@
 
 package org.apache.hcatalog.hbase;
 
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
-import org.apache.hadoop.io.Writable;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.mapred.TableOutputFormat;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.util.Progressable;
 import org.apache.hcatalog.hbase.snapshot.RevisionManager;
-
-
-import java.io.IOException;
+import org.apache.hcatalog.hbase.snapshot.Transaction;
 
 /**
- * "Direct" implementation of OutputFormat for HBase. Uses HTable client's put API to write each row to HBase one a
- * time. Presently it is just using TableOutputFormat as the underlying implementation in the future we can
- * tune this to make the writes faster such as permanently disabling WAL, caching, etc.
+ * "Direct" implementation of OutputFormat for HBase. Uses HTable client's put
+ * API to write each row to HBase one a time. Presently it is just using
+ * TableOutputFormat as the underlying implementation in the future we can tune
+ * this to make the writes faster such as permanently disabling WAL, caching,
+ * etc.
  */
-class HBaseDirectOutputFormat extends OutputFormat<WritableComparable<?>,Writable> implements Configurable {
+class HBaseDirectOutputFormat extends HBaseBaseOutputFormat {
 
-    private TableOutputFormat<WritableComparable<?>> outputFormat;
+    private TableOutputFormat outputFormat;
 
     public HBaseDirectOutputFormat() {
-        this.outputFormat = new TableOutputFormat<WritableComparable<?>>();
+        this.outputFormat = new TableOutputFormat();
     }
 
     @Override
-    public RecordWriter<WritableComparable<?>, Writable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
-        return outputFormat.getRecordWriter(context);
+    public RecordWriter<WritableComparable<?>, Put> getRecordWriter(FileSystem ignored,
+            JobConf job, String name, Progressable progress)
+            throws IOException {
+        long version = HBaseRevisionManagerUtil.getOutputRevision(job);
+        return new HBaseDirectRecordWriter(outputFormat.getRecordWriter(ignored, job, name,
+                progress), version);
     }
 
     @Override
-    public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
-        outputFormat.checkOutputSpecs(context);
-    }
+    public void checkOutputSpecs(FileSystem ignored, JobConf job)
+            throws IOException {
+        job.setOutputCommitter(HBaseDirectOutputCommitter.class);
+        job.setIfUnset(TableOutputFormat.OUTPUT_TABLE,
+                job.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY));
+        outputFormat.checkOutputSpecs(ignored, job);
+    }
+
+    private static class HBaseDirectRecordWriter implements
+            RecordWriter<WritableComparable<?>, Put> {
+
+        private RecordWriter<WritableComparable<?>, Put> baseWriter;
+        private final Long outputVersion;
+
+        public HBaseDirectRecordWriter(
+                RecordWriter<WritableComparable<?>, Put> baseWriter,
+                Long outputVersion) {
+            this.baseWriter = baseWriter;
+            this.outputVersion = outputVersion;
+        }
 
-    @Override
-    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
-        return new HBaseDirectOutputCommitter(outputFormat.getOutputCommitter(context));
-    }
+        @Override
+        public void write(WritableComparable<?> key, Put value)
+                throws IOException {
+            Put put = value;
+            if (outputVersion != null) {
+                put = new Put(value.getRow(), outputVersion.longValue());
+                for (List<KeyValue> row : value.getFamilyMap().values()) {
+                    for (KeyValue el : row) {
+                        put.add(el.getFamily(), el.getQualifier(), el.getValue());
+                    }
+                }
+            }
+            baseWriter.write(key, put);
+        }
 
-    @Override
-    public void setConf(Configuration conf) {
-        String tableName = conf.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY);
-        conf = new Configuration(conf);
-        conf.set(TableOutputFormat.OUTPUT_TABLE,tableName);
-        outputFormat.setConf(conf);
-    }
+        @Override
+        public void close(Reporter reporter) throws IOException {
+            baseWriter.close(reporter);
+        }
 
-    @Override
-    public Configuration getConf() {
-        return outputFormat.getConf();
     }
 
-    private static class HBaseDirectOutputCommitter extends OutputCommitter {
-        private OutputCommitter baseOutputCommitter;
+    public static class HBaseDirectOutputCommitter extends OutputCommitter {
 
-        public HBaseDirectOutputCommitter(OutputCommitter baseOutputCommitter) throws IOException {
-            this.baseOutputCommitter = baseOutputCommitter;
+        public HBaseDirectOutputCommitter() throws IOException {
         }
 
         @Override
-        public void abortTask(TaskAttemptContext context) throws IOException {
-            baseOutputCommitter.abortTask(context);
+        public void abortTask(TaskAttemptContext taskContext)
+                throws IOException {
         }
 
         @Override
-        public void commitTask(TaskAttemptContext context) throws IOException {
-            baseOutputCommitter.commitTask(context);
+        public void commitTask(TaskAttemptContext taskContext)
+                throws IOException {
         }
 
         @Override
-        public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
-            return baseOutputCommitter.needsTaskCommit(context);
+        public boolean needsTaskCommit(TaskAttemptContext taskContext)
+                throws IOException {
+            return false;
         }
 
         @Override
-        public void setupJob(JobContext context) throws IOException {
-            baseOutputCommitter.setupJob(context);
+        public void setupJob(JobContext jobContext) throws IOException {
         }
 
         @Override
-        public void setupTask(TaskAttemptContext context) throws IOException {
-            baseOutputCommitter.setupTask(context);
+        public void setupTask(TaskAttemptContext taskContext)
+                throws IOException {
         }
 
         @Override
-        public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
+        public void abortJob(JobContext jobContext, int status)
+                throws IOException {
+            super.abortJob(jobContext, status);
             RevisionManager rm = null;
             try {
-                baseOutputCommitter.abortJob(jobContext, state);
-                rm = HBaseHCatStorageHandler.getOpenedRevisionManager(jobContext.getConfiguration());
-                rm.abortWriteTransaction(HBaseHCatStorageHandler.getWriteTransaction(jobContext.getConfiguration()));
+                rm = HBaseRevisionManagerUtil
+                        .getOpenedRevisionManager(jobContext.getConfiguration());
+                Transaction writeTransaction = HBaseRevisionManagerUtil
+                        .getWriteTransaction(jobContext.getConfiguration());
+                rm.abortWriteTransaction(writeTransaction);
             } finally {
-                if(rm != null)
+                if (rm != null)
                     rm.close();
             }
         }
@@ -124,11 +155,12 @@ class HBaseDirectOutputFormat extends Ou
         public void commitJob(JobContext jobContext) throws IOException {
             RevisionManager rm = null;
             try {
-                baseOutputCommitter.commitJob(jobContext);
-                rm = HBaseHCatStorageHandler.getOpenedRevisionManager(jobContext.getConfiguration());
-                rm.commitWriteTransaction(HBaseHCatStorageHandler.getWriteTransaction(jobContext.getConfiguration()));
+                rm = HBaseRevisionManagerUtil
+                        .getOpenedRevisionManager(jobContext.getConfiguration());
+                rm.commitWriteTransaction(HBaseRevisionManagerUtil.getWriteTransaction(jobContext
+                        .getConfiguration()));
             } finally {
-                if(rm != null)
+                if (rm != null)
                     rm.close();
             }
         }

Modified: incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java?rev=1296572&r1=1296571&r2=1296572&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java (original)
+++ incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java Sat Mar  3 03:56:13 2012
@@ -19,22 +19,24 @@
 package org.apache.hcatalog.hbase;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
+import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hive.hbase.HBaseSerDe;
@@ -50,13 +52,11 @@ import org.apache.hadoop.hive.serde2.Ser
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hcatalog.common.HCatConstants;
 import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.schema.HCatSchema;
 import org.apache.hcatalog.hbase.snapshot.RevisionManager;
-import org.apache.hcatalog.hbase.snapshot.RevisionManagerFactory;
-import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
 import org.apache.hcatalog.hbase.snapshot.Transaction;
 import org.apache.hcatalog.hbase.snapshot.ZKBasedRevisionManager;
 import org.apache.hcatalog.mapreduce.HCatOutputFormat;
@@ -74,22 +74,93 @@ import com.facebook.fb303.FacebookBase;
  * tables through HCatalog. The implementation is very similar to the
  * HiveHBaseStorageHandler, with more details to suit HCatalog.
  */
-public class HBaseHCatStorageHandler extends HCatStorageHandler implements HiveMetaHook {
+//TODO remove serializable when HCATALOG-282 is fixed
+public class HBaseHCatStorageHandler extends HCatStorageHandler implements HiveMetaHook, Serializable {
 
-    final static public String DEFAULT_PREFIX = "default.";
+    public final static String DEFAULT_PREFIX = "default.";
+    private final static String PROPERTY_INT_OUTPUT_LOCATION = "hcat.hbase.mapreduce.intermediateOutputLocation";
 
-    private Configuration      hbaseConf;
-
-    private HBaseAdmin         admin;
+    private transient Configuration      hbaseConf;
+    private transient HBaseAdmin         admin;
 
     @Override
     public void configureInputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
-        //TODO complete rework and fill this in
+        // Populate jobProperties with input table name, table columns, RM snapshot,
+        // hbase-default.xml and hbase-site.xml
+        Map<String, String> tableJobProperties = tableDesc.getJobProperties();
+        String jobString = tableJobProperties.get(HCatConstants.HCAT_KEY_JOB_INFO);
+        try {
+            InputJobInfo inputJobInfo = (InputJobInfo) HCatUtil.deserialize(jobString);
+            HCatTableInfo tableInfo = inputJobInfo.getTableInfo();
+            String qualifiedTableName = HBaseHCatStorageHandler.getFullyQualifiedName(tableInfo);
+            jobProperties.put(TableInputFormat.INPUT_TABLE, qualifiedTableName);
+
+            Configuration jobConf = getConf();
+            String outputSchema = jobConf.get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA);
+            jobProperties.put(TableInputFormat.SCAN_COLUMNS, getScanColumns(tableInfo, outputSchema));
+
+            String serSnapshot = (String) inputJobInfo.getProperties().get(
+                    HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY);
+            if (serSnapshot == null) {
+                Configuration conf = addHbaseResources(jobConf);
+                HCatTableSnapshot snapshot = HBaseRevisionManagerUtil.createSnapshot(conf,
+                        qualifiedTableName, tableInfo);
+                jobProperties.put(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY,
+                        HCatUtil.serialize(snapshot));
+            }
+
+            addHbaseResources(jobConf, jobProperties);
+
+        } catch (IOException e) {
+            throw new IllegalStateException("Error while configuring job properties", e);
+        }
     }
 
     @Override
     public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
-        //TODO complete rework and fill this in
+        // Populate jobProperties with output table name, hbase-default.xml, hbase-site.xml, OutputJobInfo
+        // Populate RM transaction in OutputJobInfo
+        // In case of bulk mode, populate intermediate output location
+        Map<String, String> tableJobProperties = tableDesc.getJobProperties();
+        String jobString = tableJobProperties.get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
+        try {
+            OutputJobInfo outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(jobString);
+            HCatTableInfo tableInfo = outputJobInfo.getTableInfo();
+            String qualifiedTableName = HBaseHCatStorageHandler.getFullyQualifiedName(tableInfo);
+            jobProperties.put(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, qualifiedTableName);
+
+            Configuration jobConf = getConf();
+            String txnString = outputJobInfo.getProperties().getProperty(
+                    HBaseConstants.PROPERTY_WRITE_TXN_KEY);
+            if (txnString == null) {
+                Configuration conf = addHbaseResources(jobConf);
+                Transaction txn = HBaseRevisionManagerUtil.beginWriteTransaction(qualifiedTableName, tableInfo, conf);
+                outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY,
+                        HCatUtil.serialize(txn));
+
+                if (isBulkMode(outputJobInfo) && !(outputJobInfo.getProperties()
+                                .containsKey(PROPERTY_INT_OUTPUT_LOCATION))) {
+                    String tableLocation = tableInfo.getTableLocation();
+                    String location = new Path(tableLocation, "REVISION_" + txn.getRevisionNumber())
+                            .toString();
+                    outputJobInfo.getProperties().setProperty(PROPERTY_INT_OUTPUT_LOCATION,
+                            location);
+                    // We are writing out an intermediate sequenceFile hence
+                    // location is not passed in OutputJobInfo.getLocation()
+                    // TODO replace this with a mapreduce constant when available
+                    jobProperties.put("mapred.output.dir", location);
+                }
+            }
+
+            jobProperties
+                    .put(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo));
+            addHbaseResources(jobConf, jobProperties);
+            addOutputDependencyJars(jobConf);
+            jobProperties.put("tmpjars", jobConf.get("tmpjars"));
+
+        } catch (IOException e) {
+            throw new IllegalStateException("Error while configuring job properties", e);
+        }
     }
 
     /*
@@ -231,7 +302,7 @@ public class HBaseHCatStorageHandler ext
             new HTable(hbaseConf, tableDesc.getName());
 
             //Set up znodes in revision manager.
-            RevisionManager rm = getOpenedRevisionManager(hbaseConf);
+            RevisionManager rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(hbaseConf);
             if (rm instanceof ZKBasedRevisionManager) {
                 ZKBasedRevisionManager zkRM = (ZKBasedRevisionManager) rm;
                 zkRM.setUpZNodes(tableName, new ArrayList<String>(
@@ -295,36 +366,6 @@ public class HBaseHCatStorageHandler ext
         return this;
     }
 
-//TODO finish rework remove this
-//    /*
-//     * @param tableDesc
-//     *
-//     * @param jobProperties
-//     *
-//     * @see org.apache.hcatalog.storagehandler.HCatStorageHandler
-//     * #configureTableJobProperties(org.apache.hadoop.hive.ql.plan.TableDesc,
-//     * java.util.Map)
-//     */
-//    @Override
-//    public void configureTableJobProperties(TableDesc tableDesc,
-//            Map<String, String> jobProperties) {
-//        Properties tableProperties = tableDesc.getProperties();
-//
-//        jobProperties.put(HBaseSerDe.HBASE_COLUMNS_MAPPING,
-//                tableProperties.getProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING));
-//
-//        String tableName = tableProperties
-//                .getProperty(HBaseSerDe.HBASE_TABLE_NAME);
-//        if (tableName == null) {
-//            tableName = tableProperties.getProperty(Constants.META_TABLE_NAME);
-//            if (tableName.startsWith(DEFAULT_PREFIX)) {
-//                tableName = tableName.substring(DEFAULT_PREFIX.length());
-//            }
-//        }
-//        jobProperties.put(HBaseSerDe.HBASE_TABLE_NAME, tableName);
-//
-//    }
-
     private HBaseAdmin getHBaseAdmin() throws MetaException {
         try {
             if (admin == null) {
@@ -356,14 +397,12 @@ public class HBaseHCatStorageHandler ext
 
     @Override
     public Class<? extends InputFormat> getInputFormatClass() {
-        //TODO replace this with rework
-        return InputFormat.class;
+        return HBaseInputFormat.class;
     }
 
     @Override
     public Class<? extends OutputFormat> getOutputFormatClass() {
-        //TODO replace this with rework
-        return SequenceFileOutputFormat.class;
+        return HBaseBaseOutputFormat.class;
     }
 
     /*
@@ -397,6 +436,7 @@ public class HBaseHCatStorageHandler ext
     private void checkDeleteTable(Table table) throws MetaException {
         boolean isExternal = MetaStoreUtils.isExternalTable(table);
         String tableName = getHBaseTableName(table);
+        RevisionManager rm = null;
         try {
             if (!isExternal && getHBaseAdmin().tableExists(tableName)) {
                 // we have created an HBase table, so we delete it to roll back;
@@ -406,7 +446,7 @@ public class HBaseHCatStorageHandler ext
                 getHBaseAdmin().deleteTable(tableName);
 
               //Set up znodes in revision manager.
-                RevisionManager rm = getOpenedRevisionManager(hbaseConf);
+                rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(hbaseConf);
                 if (rm instanceof ZKBasedRevisionManager) {
                     ZKBasedRevisionManager zkRM = (ZKBasedRevisionManager) rm;
                     zkRM.deleteZNodes(tableName);
@@ -414,6 +454,8 @@ public class HBaseHCatStorageHandler ext
             }
         } catch (IOException ie) {
             throw new MetaException(StringUtils.stringifyException(ie));
+        } finally {
+            HBaseRevisionManagerUtil.closeRevisionManagerQuietly(rm);
         }
     }
 
@@ -436,9 +478,7 @@ public class HBaseHCatStorageHandler ext
      * @param conf
      * @throws IOException
      */
-    public static void addDependencyJars(Configuration conf) throws IOException {
-        //TODO provide a facility/interface for loading/specifying dependencies
-        //Ideally this method shouldn't be exposed to the user
+    private void addOutputDependencyJars(Configuration conf) throws IOException {
         TableMapReduceUtil.addDependencyJars(conf,
                 //hadoop-core
                 Writable.class,
@@ -452,8 +492,6 @@ public class HBaseHCatStorageHandler ext
                 HCatOutputFormat.class,
                 //hive hbase storage handler jar
                 HBaseSerDe.class,
-                //hcat hbase storage driver jar
-                HBaseOutputStorageDriver.class,
                 //hive jar
                 Table.class,
                 //libthrift jar
@@ -464,152 +502,86 @@ public class HBaseHCatStorageHandler ext
                 FacebookBase.class);
     }
 
-
     /**
-     * Creates the latest snapshot of the table.
-     *
-     * @param jobConf The job configuration.
-     * @param hbaseTableName The fully qualified name of the HBase table.
-     * @return An instance of HCatTableSnapshot
-     * @throws IOException Signals that an I/O exception has occurred.
-     */
-    public static HCatTableSnapshot createSnapshot(Configuration jobConf,
-            String hbaseTableName ) throws IOException {
-
-        RevisionManager rm = null;
-        TableSnapshot snpt;
-        try {
-            rm = getOpenedRevisionManager(jobConf);
-            snpt = rm.createSnapshot(hbaseTableName);
-        } finally {
-            if (rm != null)
-                rm.close();
-        }
-
-        String inputJobString = jobConf.get(HCatConstants.HCAT_KEY_JOB_INFO);
-        if(inputJobString == null){
-            throw new IOException(
-                    "InputJobInfo information not found in JobContext. "
-                            + "HCatInputFormat.setInput() not called?");
-        }
-        InputJobInfo inputInfo = (InputJobInfo) HCatUtil.deserialize(inputJobString);
-        HCatTableSnapshot hcatSnapshot = HBaseInputStorageDriver
-                .convertSnapshot(snpt, inputInfo.getTableInfo());
-
-        return hcatSnapshot;
+     * Utility method to get a new Configuration with hbase-default.xml and hbase-site.xml added
+     * @param jobConf existing configuration
+     * @return a new Configuration with hbase-default.xml and hbase-site.xml added
+     */
+    private Configuration addHbaseResources(Configuration jobConf) {
+        Configuration conf = new Configuration(jobConf);
+        HBaseConfiguration.addHbaseResources(conf);
+        return conf;
     }
 
     /**
-     * Creates the snapshot using the revision specified by the user.
-     *
-     * @param jobConf The job configuration.
-     * @param tableName The fully qualified name of the table whose snapshot is being taken.
-     * @param revision The revision number to use for the snapshot.
-     * @return An instance of HCatTableSnapshot.
-     * @throws IOException Signals that an I/O exception has occurred.
-     */
-    public static HCatTableSnapshot createSnapshot(Configuration jobConf,
-            String tableName, long revision)
-            throws IOException {
-
-        TableSnapshot snpt;
-        RevisionManager rm = null;
-        try {
-            rm = getOpenedRevisionManager(jobConf);
-            snpt = rm.createSnapshot(tableName, revision);
-        } finally {
-            if (rm != null)
-                rm.close();
-        }
-
-        String inputJobString = jobConf.get(HCatConstants.HCAT_KEY_JOB_INFO);
-        if(inputJobString == null){
-            throw new IOException(
-                    "InputJobInfo information not found in JobContext. "
-                            + "HCatInputFormat.setInput() not called?");
+     * Utility method to add hbase-default.xml and hbase-site.xml properties to a new map
+     * if they are not already present in the jobConf.
+     * @param jobConf Job configuration
+     * @param newJobProperties  Map to which new properties should be added
+     */
+    private void addHbaseResources(Configuration jobConf,
+            Map<String, String> newJobProperties) {
+        Configuration conf = new Configuration(false);
+        HBaseConfiguration.addHbaseResources(conf);
+        for (Entry<String, String> entry : conf) {
+            if (jobConf.get(entry.getKey()) == null)
+                newJobProperties.put(entry.getKey(), entry.getValue());
         }
-        InputJobInfo inputInfo = (InputJobInfo) HCatUtil.deserialize(inputJobString);
-        HCatTableSnapshot hcatSnapshot = HBaseInputStorageDriver
-                .convertSnapshot(snpt, inputInfo.getTableInfo());
-
-        return hcatSnapshot;
     }
 
-    /**
-     * Gets an instance of revision manager which is opened.
-     *
-     * @param jobConf The job configuration.
-     * @return RevisionManager An instance of revision manager.
-     * @throws IOException
-     */
-    static RevisionManager getOpenedRevisionManager(Configuration jobConf) throws IOException {
-
-        Properties properties = new Properties();
-        String zkHostList = jobConf.get(HConstants.ZOOKEEPER_QUORUM);
-        int port = jobConf.getInt("hbase.zookeeper.property.clientPort",
-                HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
-
-        if (zkHostList != null) {
-            String[] splits = zkHostList.split(",");
-            StringBuffer sb = new StringBuffer();
-            for (String split : splits) {
-                sb.append(split);
-                sb.append(':');
-                sb.append(port);
-                sb.append(',');
-            }
-
-            sb.deleteCharAt(sb.length() - 1);
-            properties.put(ZKBasedRevisionManager.HOSTLIST, sb.toString());
-        }
-        String dataDir = jobConf.get(ZKBasedRevisionManager.DATADIR);
-        if (dataDir != null) {
-            properties.put(ZKBasedRevisionManager.DATADIR, dataDir);
-        }
-        String rmClassName = jobConf.get(
-                RevisionManager.REVISION_MGR_IMPL_CLASS,
-                ZKBasedRevisionManager.class.getName());
-        properties.put(RevisionManager.REVISION_MGR_IMPL_CLASS, rmClassName);
-        RevisionManager revisionManger = RevisionManagerFactory
-                .getRevisionManager(properties);
-        revisionManger.open();
-        return revisionManger;
-    }
-
-    /**
-     * Set snapshot as a property.
-     *
-     * @param snapshot The HCatTableSnapshot to be passed to the job.
-     * @param inpJobInfo The InputJobInfo for the job.
-     * @throws IOException
-     */
-    public void setSnapshot(HCatTableSnapshot snapshot, InputJobInfo inpJobInfo)
-            throws IOException {
-        String serializedSnp = HCatUtil.serialize(snapshot);
-        inpJobInfo.getProperties().setProperty(
-                HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY, serializedSnp);
-    }
-
-    static Transaction getWriteTransaction(Configuration conf) throws IOException {
-        OutputJobInfo outputJobInfo = (OutputJobInfo)HCatUtil.deserialize(conf.get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
-        return (Transaction) HCatUtil.deserialize(outputJobInfo.getProperties()
-                                                               .getProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY));
-    }
-
-    static void setWriteTransaction(Configuration conf, Transaction txn) throws IOException {
-        OutputJobInfo outputJobInfo = (OutputJobInfo)HCatUtil.deserialize(conf.get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
-        outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY, HCatUtil.serialize(txn));
-        conf.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo));
-    }
-
-    /**
-     * Get the Revision number that will be assigned to this job's output data
-     * @param conf configuration of the job
-     * @return the revision number used
-     * @throws IOException
-     */
-    public static long getOutputRevision(Configuration conf) throws IOException {
-        return getWriteTransaction(conf).getRevisionNumber();
+    public static boolean isBulkMode(OutputJobInfo outputJobInfo) {
+        //Default is false
+        String bulkMode = outputJobInfo.getTableInfo().getStorerInfo().getProperties()
+                .getProperty(HBaseConstants.PROPERTY_OSD_BULK_MODE_KEY,
+                        "false");
+        return "true".equals(bulkMode);
+    }
+
+    private String getScanColumns(HCatTableInfo tableInfo, String outputColSchema) throws IOException {
+        StringBuilder builder = new StringBuilder();
+        String hbaseColumnMapping = tableInfo.getStorerInfo().getProperties()
+                .getProperty(HBaseConstants.PROPERTY_COLUMN_MAPPING_KEY);
+        if (outputColSchema == null) {
+            String[] splits = hbaseColumnMapping.split("[,]");
+            for (int i = 0; i < splits.length; i++) {
+                if (!splits[i].equals(HBaseSerDe.HBASE_KEY_COL))
+                    builder.append(splits[i]).append(" ");
+            }
+        } else {
+            HCatSchema outputSchema = (HCatSchema) HCatUtil.deserialize(outputColSchema);
+            HCatSchema tableSchema = tableInfo.getDataColumns();
+            List<String> outputFieldNames = outputSchema.getFieldNames();
+            List<Integer> outputColumnMapping = new ArrayList<Integer>();
+            for(String fieldName: outputFieldNames){
+                int position = tableSchema.getPosition(fieldName);
+                outputColumnMapping.add(position);
+            }
+            try {
+                List<String> columnFamilies = new ArrayList<String>();
+                List<String> columnQualifiers = new ArrayList<String>();
+                HBaseSerDe.parseColumnMapping(hbaseColumnMapping, columnFamilies, null,
+                        columnQualifiers, null);
+                for (int i = 0; i < outputColumnMapping.size(); i++) {
+                    int cfIndex = outputColumnMapping.get(i);
+                    String cf = columnFamilies.get(cfIndex);
+                    // We skip the key column.
+                    if (cf.equals(HBaseSerDe.HBASE_KEY_COL) == false) {
+                        String qualifier = columnQualifiers.get(i);
+                        builder.append(cf);
+                        builder.append(":");
+                        if (qualifier != null) {
+                            builder.append(qualifier);
+                        }
+                        builder.append(" ");
+                    }
+                }
+            } catch (SerDeException e) {
+                throw new IOException(e);
+            }
+        }
+        //Remove the extra space delimiter
+        builder.deleteCharAt(builder.length() - 1);
+        return builder.toString();
     }
 
 }

Modified: incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java?rev=1296572&r1=1296571&r2=1296572&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java (original)
+++ incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java Sat Mar  3 03:56:13 2012
@@ -21,33 +21,31 @@ package org.apache.hcatalog.hbase;
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapred.TableSplit;
 import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
-import org.apache.hadoop.hbase.mapreduce.TableSplit;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapred.HCatMapRedUtil;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
 import org.apache.hcatalog.mapreduce.InputJobInfo;
 
 /**
  * This class HBaseInputFormat is a wrapper class of TableInputFormat in HBase.
  */
-class HBaseInputFormat extends InputFormat<ImmutableBytesWritable, Result> implements Configurable{
+class HBaseInputFormat implements InputFormat<ImmutableBytesWritable, Result> {
 
     private final TableInputFormat inputFormat;
-    private final InputJobInfo jobInfo;
-    private Configuration conf;
 
-    public HBaseInputFormat(InputJobInfo jobInfo) {
+    public HBaseInputFormat() {
         inputFormat = new TableInputFormat();
-        this.jobInfo = jobInfo;
     }
 
     /*
@@ -67,20 +65,27 @@ class HBaseInputFormat extends InputForm
      * org.apache.hadoop.mapreduce.TaskAttemptContext)
      */
     @Override
-    public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
-            InputSplit split, TaskAttemptContext tac) throws IOException,
-            InterruptedException {
-
-          String tableName = inputFormat.getConf().get(TableInputFormat.INPUT_TABLE);
-          TableSplit tSplit = (TableSplit) split;
-          HbaseSnapshotRecordReader recordReader = new HbaseSnapshotRecordReader(jobInfo);
-          Scan sc = new Scan(inputFormat.getScan());
-          sc.setStartRow(tSplit.getStartRow());
-          sc.setStopRow(tSplit.getEndRow());
-          recordReader.setScan(sc);
-          recordReader.setHTable(new HTable(this.conf, tableName));
-          recordReader.init();
-          return recordReader;
+    public RecordReader<ImmutableBytesWritable, Result> getRecordReader(
+            InputSplit split, JobConf job, Reporter reporter)
+            throws IOException {
+        String jobString = job.get(HCatConstants.HCAT_KEY_JOB_INFO);
+        InputJobInfo inputJobInfo = (InputJobInfo) HCatUtil.deserialize(jobString);
+
+        String tableName = job.get(TableInputFormat.INPUT_TABLE);
+        TableSplit tSplit = (TableSplit) split;
+        HbaseSnapshotRecordReader recordReader = new HbaseSnapshotRecordReader(inputJobInfo, job);
+        inputFormat.setConf(job);
+        Scan inputScan = inputFormat.getScan();
+        // TODO: Make the caching configurable by the user
+        inputScan.setCaching(200);
+        inputScan.setCacheBlocks(false);
+        Scan sc = new Scan(inputScan);
+        sc.setStartRow(tSplit.getStartRow());
+        sc.setStopRow(tSplit.getEndRow());
+        recordReader.setScan(sc);
+        recordReader.setHTable(new HTable(job, tableName));
+        recordReader.init();
+        return recordReader;
     }
 
     /*
@@ -97,35 +102,24 @@ class HBaseInputFormat extends InputForm
      * .JobContext)
      */
     @Override
-    public List<InputSplit> getSplits(JobContext jobContext)
-            throws IOException, InterruptedException {
-
-        String tableName = this.conf.get(TableInputFormat.INPUT_TABLE);
-        if (tableName == null) {
-           throw new IOException("The input table is not set. The input splits cannot be created.");
+    public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job, int numSplits)
+            throws IOException {
+        inputFormat.setConf(job);
+        return convertSplits(inputFormat.getSplits(HCatMapRedUtil.createJobContext(job, null,
+                Reporter.NULL)));
+    }
+
+    private InputSplit[] convertSplits(List<org.apache.hadoop.mapreduce.InputSplit> splits) {
+        InputSplit[] converted = new InputSplit[splits.size()];
+        for (int i = 0; i < splits.size(); i++) {
+            org.apache.hadoop.hbase.mapreduce.TableSplit tableSplit =
+                    (org.apache.hadoop.hbase.mapreduce.TableSplit) splits.get(i);
+            TableSplit newTableSplit = new TableSplit(tableSplit.getTableName(),
+                    tableSplit.getStartRow(),
+                    tableSplit.getEndRow(), tableSplit.getRegionLocation());
+            converted[i] = newTableSplit;
         }
-        return inputFormat.getSplits(jobContext);
-    }
-
-    public void setConf(Configuration conf) {
-        this.conf = conf;
-        inputFormat.setConf(conf);
-    }
-
-    public Scan getScan() {
-        return inputFormat.getScan();
-    }
-
-    public void setScan(Scan scan) {
-        inputFormat.setScan(scan);
-    }
-
-    /* @return
-     * @see org.apache.hadoop.conf.Configurable#getConf()
-     */
-    @Override
-    public Configuration getConf() {
-       return this.conf;
+        return converted;
     }
 
 }

Modified: incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java?rev=1296572&r1=1296571&r2=1296572&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java (original)
+++ incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java Sat Mar  3 03:56:13 2012
@@ -20,6 +20,7 @@ package org.apache.hcatalog.hbase;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -27,15 +28,20 @@ import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.TableRecordReader;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.hbase.snapshot.FamilyRevision;
+import org.apache.hcatalog.hbase.snapshot.RevisionManager;
 import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
 import org.apache.hcatalog.mapreduce.InputJobInfo;
 
@@ -43,59 +49,93 @@ import org.apache.hcatalog.mapreduce.Inp
  * The Class HbaseSnapshotRecordReader implements logic for filtering records
  * based on snapshot.
  */
-class HbaseSnapshotRecordReader extends TableRecordReader {
+class HbaseSnapshotRecordReader implements RecordReader<ImmutableBytesWritable, Result> {
 
     static final Log LOG = LogFactory.getLog(HbaseSnapshotRecordReader.class);
+    private final InputJobInfo inpJobInfo;
+    private final Configuration conf;
+    private final int maxRevisions = 1;
     private ResultScanner scanner;
     private Scan  scan;
     private HTable  htable;
-    private ImmutableBytesWritable key;
-    private Result value;
-    private InputJobInfo inpJobInfo;
     private TableSnapshot snapshot;
-    private int maxRevisions;
     private Iterator<Result> resultItr;
+    private Set<Long> allAbortedTransactions;
+    private DataOutputBuffer valueOut = new DataOutputBuffer();
+    private DataInputBuffer valueIn = new DataInputBuffer();
 
-
-    HbaseSnapshotRecordReader(InputJobInfo inputJobInfo) throws IOException {
+    HbaseSnapshotRecordReader(InputJobInfo inputJobInfo, Configuration conf) throws IOException {
         this.inpJobInfo = inputJobInfo;
-        String snapshotString = inpJobInfo.getProperties().getProperty(
-                HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY);
+        this.conf = conf;
+        String snapshotString = conf.get(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY);
         HCatTableSnapshot hcatSnapshot = (HCatTableSnapshot) HCatUtil
                 .deserialize(snapshotString);
-        this.snapshot = HBaseInputStorageDriver.convertSnapshot(hcatSnapshot,
+        this.snapshot = HBaseRevisionManagerUtil.convertSnapshot(hcatSnapshot,
                 inpJobInfo.getTableInfo());
-        this.maxRevisions = 1;
     }
 
-    /* @param firstRow The first record in the split.
-    /* @throws IOException
-     * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#restart(byte[])
-     */
-    @Override
+    public void init() throws IOException {
+        restart(scan.getStartRow());
+    }
+
     public void restart(byte[] firstRow) throws IOException {
+        allAbortedTransactions = getAbortedTransactions(Bytes.toString(htable.getTableName()), scan);
+        long maxValidRevision = snapshot.getLatestRevision();
+        while (allAbortedTransactions.contains(maxValidRevision)) {
+            maxValidRevision--;
+        }
+        long minValidRevision = getMinimumRevision(scan, snapshot);
+        while (allAbortedTransactions.contains(minValidRevision)) {
+            minValidRevision--;
+        }
         Scan newScan = new Scan(scan);
         newScan.setStartRow(firstRow);
+        //TODO: See if filters in 0.92 can be used to optimize the scan
+        //TODO: Consider create a custom snapshot filter
+        newScan.setTimeRange(minValidRevision, maxValidRevision + 1);
+        newScan.setMaxVersions();
         this.scanner = this.htable.getScanner(newScan);
         resultItr = this.scanner.iterator();
     }
 
-    /* @throws IOException
-     * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#init()
-     */
-    @Override
-    public void init() throws IOException {
-        restart(scan.getStartRow());
+    private Set<Long> getAbortedTransactions(String tableName, Scan scan) throws IOException {
+        Set<Long> abortedTransactions = new HashSet<Long>();
+        RevisionManager rm = null;
+        try {
+            rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf);
+            byte[][] families = scan.getFamilies();
+            for (byte[] familyKey : families) {
+                String family = Bytes.toString(familyKey);
+                List<FamilyRevision> abortedWriteTransactions = rm.getAbortedWriteTransactions(
+                        tableName, family);
+                if (abortedWriteTransactions != null) {
+                    for (FamilyRevision revision : abortedWriteTransactions) {
+                        abortedTransactions.add(revision.getRevision());
+                    }
+                }
+            }
+            return abortedTransactions;
+        } finally {
+            HBaseRevisionManagerUtil.closeRevisionManagerQuietly(rm);
+        }
+    }
+
+    private long getMinimumRevision(Scan scan, TableSnapshot snapshot) {
+        long minRevision = snapshot.getLatestRevision();
+        byte[][] families = scan.getFamilies();
+        for (byte[] familyKey : families) {
+            String family = Bytes.toString(familyKey);
+            long revision = snapshot.getRevision(family);
+            if (revision < minRevision)
+                minRevision = revision;
+        }
+        return minRevision;
     }
 
     /*
      * @param htable The HTable ( of HBase) to use for the record reader.
      *
-     * @see
-     * org.apache.hadoop.hbase.mapreduce.TableRecordReader#setHTable(org.apache
-     * .hadoop.hbase.client.HTable)
      */
-    @Override
     public void setHTable(HTable htable) {
         this.htable = htable;
     }
@@ -103,64 +143,51 @@ class HbaseSnapshotRecordReader extends 
     /*
      * @param scan The scan to be used for reading records.
      *
-     * @see
-     * org.apache.hadoop.hbase.mapreduce.TableRecordReader#setScan(org.apache
-     * .hadoop.hbase.client.Scan)
      */
-    @Override
     public void setScan(Scan scan) {
         this.scan = scan;
     }
 
-    /*
-     * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#close()
-     */
     @Override
-    public void close() {
-        this.resultItr = null;
-        this.scanner.close();
+    public ImmutableBytesWritable createKey() {
+        return new ImmutableBytesWritable();
     }
 
-    /* @return The row of hbase record.
-    /* @throws IOException
-    /* @throws InterruptedException
-     * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#getCurrentKey()
-     */
     @Override
-    public ImmutableBytesWritable getCurrentKey() throws IOException,
-            InterruptedException {
-        return key;
+    public Result createValue() {
+        return new Result();
     }
 
-    /* @return Single row result of scan of HBase table.
-    /* @throws IOException
-    /* @throws InterruptedException
-     * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#getCurrentValue()
-     */
     @Override
-    public Result getCurrentValue() throws IOException, InterruptedException {
-        return value;
+    public long getPos() {
+        // This should be the ordinal tuple in the range;
+        // not clear how to calculate...
+        return 0;
     }
 
-    /* @return Returns whether a next key-value is available for reading.
-     * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#nextKeyValue()
-     */
     @Override
-    public boolean nextKeyValue() {
+    public float getProgress() throws IOException {
+        // Depends on the total number of tuples
+        return 0;
+    }
 
+    @Override
+    public boolean next(ImmutableBytesWritable key, Result value) throws IOException {
         if (this.resultItr == null) {
             LOG.warn("The HBase result iterator is found null. It is possible"
                     + " that the record reader has already been closed.");
         } else {
-
-            if (key == null)
-                key = new ImmutableBytesWritable();
             while (resultItr.hasNext()) {
                 Result temp = resultItr.next();
                 Result hbaseRow = prepareResult(temp.list());
                 if (hbaseRow != null) {
+                    // Update key and value. Currently no way to avoid serialization/de-serialization
+                    // as no setters are available.
                     key.set(hbaseRow.getRow());
-                    value = hbaseRow;
+                    valueOut.reset();
+                    hbaseRow.write(valueOut);
+                    valueIn.reset(valueOut.getData(), valueOut.getLength());
+                    value.readFields(valueIn);
                     return true;
                 }
 
@@ -185,6 +212,11 @@ class HbaseSnapshotRecordReader extends 
             }
 
             String family = Bytes.toString(kv.getFamily());
+            //Ignore aborted transactions
+            if (allAbortedTransactions.contains(kv.getTimestamp())) {
+                continue;
+            }
+
             long desiredTS = snapshot.getRevision(family);
             if (kv.getTimestamp() <= desiredTS) {
                 kvs.add(kv);
@@ -213,13 +245,13 @@ class HbaseSnapshotRecordReader extends 
         }
     }
 
-    /* @return The progress of the record reader.
-     * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#getProgress()
+    /*
+     * @see org.apache.hadoop.hbase.mapred.TableRecordReader#close()
      */
     @Override
-    public float getProgress() {
-        // Depends on the total number of tuples
-        return 0;
+    public void close() {
+        this.resultItr = null;
+        this.scanner.close();
     }
 
 }

Modified: incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/FamilyRevision.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/FamilyRevision.java?rev=1296572&r1=1296571&r2=1296572&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/FamilyRevision.java (original)
+++ incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/FamilyRevision.java Sat Mar  3 03:56:13 2012
@@ -25,7 +25,7 @@ package org.apache.hcatalog.hbase.snapsh
  * family and stored in the corresponding znode. When a write transaction is
  * committed, the transaction object is removed from the list.
  */
-class FamilyRevision implements
+public class FamilyRevision implements
         Comparable<FamilyRevision> {
 
     private long revision;
@@ -42,11 +42,11 @@ class FamilyRevision implements
         this.timestamp = ts;
     }
 
-    long getRevision() {
+    public long getRevision() {
         return revision;
     }
 
-    long getExpireTimestamp() {
+    public long getExpireTimestamp() {
         return timestamp;
     }
 

Modified: incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java?rev=1296572&r1=1296571&r2=1296572&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java (original)
+++ incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java Sat Mar  3 03:56:13 2012
@@ -89,6 +89,17 @@ public interface RevisionManager {
             throws IOException;
 
     /**
+     * Get the list of aborted Transactions for a column family
+     *
+     * @param table the table name
+     * @param columnFamily the column family name
+     * @return a list of aborted WriteTransactions
+     * @throws java.io.IOException
+     */
+    public List<FamilyRevision> getAbortedWriteTransactions(String table,
+        String columnFamily) throws IOException;
+
+    /**
      * Create the latest snapshot of the table.
      *
      * @param tableName

Modified: incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java?rev=1296572&r1=1296571&r2=1296572&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java (original)
+++ incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java Sat Mar  3 03:56:13 2012
@@ -365,14 +365,8 @@ public class ZKBasedRevisionManager impl
         return zkUtil.getTransactionList(path);
     }
 
-    /**
-     * Get the list of aborted Transactions for a column family
-     * @param table the table name
-     * @param columnFamily the column family name
-     * @return a list of aborted WriteTransactions
-     * @throws java.io.IOException
-     */
-     List<FamilyRevision> getAbortedWriteTransactions(String table,
+    @Override
+     public List<FamilyRevision> getAbortedWriteTransactions(String table,
             String columnFamily) throws IOException {
          String path = PathUtil.getAbortInformationPath(baseDir, table, columnFamily);
          return zkUtil.getTransactionList(path);