You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by to...@apache.org on 2012/03/03 02:56:06 UTC

svn commit: r1296568 [2/3] - in /incubator/hcatalog/trunk: ./ src/java/org/apache/hcatalog/mapreduce/ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ storage-drivers/hbase/src...

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java?rev=1296568&r1=1296567&r2=1296568&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java Sat Mar  3 02:56:05 2012
@@ -20,6 +20,7 @@ package org.apache.hcatalog.hbase;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -27,15 +28,20 @@ import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.TableRecordReader;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.hbase.snapshot.FamilyRevision;
+import org.apache.hcatalog.hbase.snapshot.RevisionManager;
 import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
 import org.apache.hcatalog.mapreduce.InputJobInfo;
 
@@ -43,59 +49,93 @@ import org.apache.hcatalog.mapreduce.Inp
  * The Class HbaseSnapshotRecordReader implements logic for filtering records
  * based on snapshot.
  */
-class HbaseSnapshotRecordReader extends TableRecordReader {
+class HbaseSnapshotRecordReader implements RecordReader<ImmutableBytesWritable, Result> {
 
     static final Log LOG = LogFactory.getLog(HbaseSnapshotRecordReader.class);
+    private final InputJobInfo inpJobInfo;
+    private final Configuration conf;
+    private final int maxRevisions = 1;
     private ResultScanner scanner;
     private Scan  scan;
     private HTable  htable;
-    private ImmutableBytesWritable key;
-    private Result value;
-    private InputJobInfo inpJobInfo;
     private TableSnapshot snapshot;
-    private int maxRevisions;
     private Iterator<Result> resultItr;
+    private Set<Long> allAbortedTransactions;
+    private DataOutputBuffer valueOut = new DataOutputBuffer();
+    private DataInputBuffer valueIn = new DataInputBuffer();
 
-
-    HbaseSnapshotRecordReader(InputJobInfo inputJobInfo) throws IOException {
+    HbaseSnapshotRecordReader(InputJobInfo inputJobInfo, Configuration conf) throws IOException {
         this.inpJobInfo = inputJobInfo;
-        String snapshotString = inpJobInfo.getProperties().getProperty(
-                HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY);
+        this.conf = conf;
+        String snapshotString = conf.get(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY);
         HCatTableSnapshot hcatSnapshot = (HCatTableSnapshot) HCatUtil
                 .deserialize(snapshotString);
-        this.snapshot = HBaseInputStorageDriver.convertSnapshot(hcatSnapshot,
+        this.snapshot = HBaseRevisionManagerUtil.convertSnapshot(hcatSnapshot,
                 inpJobInfo.getTableInfo());
-        this.maxRevisions = 1;
     }
 
-    /* @param firstRow The first record in the split.
-    /* @throws IOException
-     * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#restart(byte[])
-     */
-    @Override
+    public void init() throws IOException {
+        restart(scan.getStartRow());
+    }
+
     public void restart(byte[] firstRow) throws IOException {
+        allAbortedTransactions = getAbortedTransactions(Bytes.toString(htable.getTableName()), scan);
+        long maxValidRevision = snapshot.getLatestRevision();
+        while (allAbortedTransactions.contains(maxValidRevision)) {
+            maxValidRevision--;
+        }
+        long minValidRevision = getMinimumRevision(scan, snapshot);
+        while (allAbortedTransactions.contains(minValidRevision)) {
+            minValidRevision--;
+        }
         Scan newScan = new Scan(scan);
         newScan.setStartRow(firstRow);
+        //TODO: See if filters in 0.92 can be used to optimize the scan
+        //TODO: Consider create a custom snapshot filter
+        newScan.setTimeRange(minValidRevision, maxValidRevision + 1);
+        newScan.setMaxVersions();
         this.scanner = this.htable.getScanner(newScan);
         resultItr = this.scanner.iterator();
     }
 
-    /* @throws IOException
-     * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#init()
-     */
-    @Override
-    public void init() throws IOException {
-        restart(scan.getStartRow());
+    private Set<Long> getAbortedTransactions(String tableName, Scan scan) throws IOException {
+        Set<Long> abortedTransactions = new HashSet<Long>();
+        RevisionManager rm = null;
+        try {
+            rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf);
+            byte[][] families = scan.getFamilies();
+            for (byte[] familyKey : families) {
+                String family = Bytes.toString(familyKey);
+                List<FamilyRevision> abortedWriteTransactions = rm.getAbortedWriteTransactions(
+                        tableName, family);
+                if (abortedWriteTransactions != null) {
+                    for (FamilyRevision revision : abortedWriteTransactions) {
+                        abortedTransactions.add(revision.getRevision());
+                    }
+                }
+            }
+            return abortedTransactions;
+        } finally {
+            HBaseRevisionManagerUtil.closeRevisionManagerQuietly(rm);
+        }
+    }
+
+    private long getMinimumRevision(Scan scan, TableSnapshot snapshot) {
+        long minRevision = snapshot.getLatestRevision();
+        byte[][] families = scan.getFamilies();
+        for (byte[] familyKey : families) {
+            String family = Bytes.toString(familyKey);
+            long revision = snapshot.getRevision(family);
+            if (revision < minRevision)
+                minRevision = revision;
+        }
+        return minRevision;
     }
 
     /*
      * @param htable The HTable ( of HBase) to use for the record reader.
      *
-     * @see
-     * org.apache.hadoop.hbase.mapreduce.TableRecordReader#setHTable(org.apache
-     * .hadoop.hbase.client.HTable)
      */
-    @Override
     public void setHTable(HTable htable) {
         this.htable = htable;
     }
@@ -103,64 +143,51 @@ class HbaseSnapshotRecordReader extends 
     /*
      * @param scan The scan to be used for reading records.
      *
-     * @see
-     * org.apache.hadoop.hbase.mapreduce.TableRecordReader#setScan(org.apache
-     * .hadoop.hbase.client.Scan)
      */
-    @Override
     public void setScan(Scan scan) {
         this.scan = scan;
     }
 
-    /*
-     * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#close()
-     */
     @Override
-    public void close() {
-        this.resultItr = null;
-        this.scanner.close();
+    public ImmutableBytesWritable createKey() {
+        return new ImmutableBytesWritable();
     }
 
-    /* @return The row of hbase record.
-    /* @throws IOException
-    /* @throws InterruptedException
-     * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#getCurrentKey()
-     */
     @Override
-    public ImmutableBytesWritable getCurrentKey() throws IOException,
-            InterruptedException {
-        return key;
+    public Result createValue() {
+        return new Result();
     }
 
-    /* @return Single row result of scan of HBase table.
-    /* @throws IOException
-    /* @throws InterruptedException
-     * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#getCurrentValue()
-     */
     @Override
-    public Result getCurrentValue() throws IOException, InterruptedException {
-        return value;
+    public long getPos() {
+        // This should be the ordinal tuple in the range;
+        // not clear how to calculate...
+        return 0;
     }
 
-    /* @return Returns whether a next key-value is available for reading.
-     * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#nextKeyValue()
-     */
     @Override
-    public boolean nextKeyValue() {
+    public float getProgress() throws IOException {
+        // Depends on the total number of tuples
+        return 0;
+    }
 
+    @Override
+    public boolean next(ImmutableBytesWritable key, Result value) throws IOException {
         if (this.resultItr == null) {
             LOG.warn("The HBase result iterator is found null. It is possible"
                     + " that the record reader has already been closed.");
         } else {
-
-            if (key == null)
-                key = new ImmutableBytesWritable();
             while (resultItr.hasNext()) {
                 Result temp = resultItr.next();
                 Result hbaseRow = prepareResult(temp.list());
                 if (hbaseRow != null) {
+                    // Update key and value. Currently no way to avoid serialization/de-serialization
+                    // as no setters are available.
                     key.set(hbaseRow.getRow());
-                    value = hbaseRow;
+                    valueOut.reset();
+                    hbaseRow.write(valueOut);
+                    valueIn.reset(valueOut.getData(), valueOut.getLength());
+                    value.readFields(valueIn);
                     return true;
                 }
 
@@ -185,6 +212,11 @@ class HbaseSnapshotRecordReader extends 
             }
 
             String family = Bytes.toString(kv.getFamily());
+            //Ignore aborted transactions
+            if (allAbortedTransactions.contains(kv.getTimestamp())) {
+                continue;
+            }
+
             long desiredTS = snapshot.getRevision(family);
             if (kv.getTimestamp() <= desiredTS) {
                 kvs.add(kv);
@@ -213,13 +245,13 @@ class HbaseSnapshotRecordReader extends 
         }
     }
 
-    /* @return The progress of the record reader.
-     * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#getProgress()
+    /*
+     * @see org.apache.hadoop.hbase.mapred.TableRecordReader#close()
      */
     @Override
-    public float getProgress() {
-        // Depends on the total number of tuples
-        return 0;
+    public void close() {
+        this.resultItr = null;
+        this.scanner.close();
     }
 
 }

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/FamilyRevision.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/FamilyRevision.java?rev=1296568&r1=1296567&r2=1296568&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/FamilyRevision.java (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/FamilyRevision.java Sat Mar  3 02:56:05 2012
@@ -25,7 +25,7 @@ package org.apache.hcatalog.hbase.snapsh
  * family and stored in the corresponding znode. When a write transaction is
  * committed, the transaction object is removed from the list.
  */
-class FamilyRevision implements
+public class FamilyRevision implements
         Comparable<FamilyRevision> {
 
     private long revision;
@@ -42,11 +42,11 @@ class FamilyRevision implements
         this.timestamp = ts;
     }
 
-    long getRevision() {
+    public long getRevision() {
         return revision;
     }
 
-    long getExpireTimestamp() {
+    public long getExpireTimestamp() {
         return timestamp;
     }
 

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java?rev=1296568&r1=1296567&r2=1296568&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java Sat Mar  3 02:56:05 2012
@@ -89,6 +89,17 @@ public interface RevisionManager {
             throws IOException;
 
     /**
+     * Get the list of aborted Transactions for a column family
+     *
+     * @param table the table name
+     * @param columnFamily the column family name
+     * @return a list of aborted WriteTransactions
+     * @throws java.io.IOException
+     */
+    public List<FamilyRevision> getAbortedWriteTransactions(String table,
+        String columnFamily) throws IOException;
+
+    /**
      * Create the latest snapshot of the table.
      *
      * @param tableName

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java?rev=1296568&r1=1296567&r2=1296568&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java Sat Mar  3 02:56:05 2012
@@ -365,14 +365,8 @@ public class ZKBasedRevisionManager impl
         return zkUtil.getTransactionList(path);
     }
 
-    /**
-     * Get the list of aborted Transactions for a column family
-     * @param table the table name
-     * @param columnFamily the column family name
-     * @return a list of aborted WriteTransactions
-     * @throws java.io.IOException
-     */
-     List<FamilyRevision> getAbortedWriteTransactions(String table,
+    @Override
+     public List<FamilyRevision> getAbortedWriteTransactions(String table,
             String columnFamily) throws IOException {
          String path = PathUtil.getAbortInformationPath(baseDir, table, columnFamily);
          return zkUtil.getTransactionList(path);

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java?rev=1296568&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java Sat Mar  3 02:56:05 2012
@@ -0,0 +1,609 @@
+package org.apache.hcatalog.hbase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hcatalog.cli.HCatDriver;
+import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.hbase.TestHBaseDirectOutputFormat.MapReadAbortedTransaction;
+import org.apache.hcatalog.hbase.TestHBaseDirectOutputFormat.MapWriteAbortTransaction;
+import org.apache.hcatalog.hbase.snapshot.FamilyRevision;
+import org.apache.hcatalog.hbase.snapshot.RevisionManager;
+import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
+import org.apache.hcatalog.hbase.snapshot.Transaction;
+import org.apache.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests components of HBaseHCatStorageHandler using ManyMiniCluster.
+ * Including ImprtSequenceFile and HBaseBulkOutputFormat
+ */
+public class TestHBaseBulkOutputFormat extends SkeletonHBaseTest {
+    private final static Log LOG = LogFactory.getLog(TestHBaseBulkOutputFormat.class);
+
+    private final HiveConf allConf;
+    private final HCatDriver hcatDriver;
+
+    public TestHBaseBulkOutputFormat() {
+        allConf = getHiveConf();
+        allConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
+                HCatSemanticAnalyzer.class.getName());
+        allConf.set(HiveConf.ConfVars.HADOOPFS.varname, getFileSystem().getUri().toString());
+        allConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, new Path(getTestDir(),"warehouse").toString());
+
+        //Add hbase properties
+        for (Map.Entry<String, String> el : getHbaseConf())
+            allConf.set(el.getKey(), el.getValue());
+        for (Map.Entry<String, String> el : getJobConf())
+            allConf.set(el.getKey(), el.getValue());
+
+        SessionState.start(new CliSessionState(allConf));
+        hcatDriver = new HCatDriver();
+    }
+
+    public static class MapWriteOldMapper implements org.apache.hadoop.mapred.Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
+
+        @Override
+        public void close() throws IOException {
+        }
+
+        @Override
+        public void configure(JobConf job) {
+        }
+
+        @Override
+        public void map(LongWritable key, Text value,
+                OutputCollector<ImmutableBytesWritable, Put> output,
+                Reporter reporter) throws IOException {
+            String vals[] = value.toString().split(",");
+            Put put = new Put(Bytes.toBytes(vals[0]));
+            for(int i=1;i<vals.length;i++) {
+                String pair[] = vals[i].split(":");
+                put.add(Bytes.toBytes("my_family"),
+                        Bytes.toBytes(pair[0]),
+                        Bytes.toBytes(pair[1]));
+            }
+            output.collect(new ImmutableBytesWritable(Bytes.toBytes(vals[0])),put);
+        }
+
+    }
+
+    public static class MapWrite extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
+
+        @Override
+        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+            String vals[] = value.toString().split(",");
+            Put put = new Put(Bytes.toBytes(vals[0]));
+            for(int i=1;i<vals.length;i++) {
+                String pair[] = vals[i].split(":");
+                put.add(Bytes.toBytes("my_family"),
+                        Bytes.toBytes(pair[0]),
+                        Bytes.toBytes(pair[1]));
+            }
+            context.write(new ImmutableBytesWritable(Bytes.toBytes(vals[0])),put);
+        }
+    }
+
+    public static class MapHCatWrite extends Mapper<LongWritable, Text, BytesWritable, HCatRecord> {
+        @Override
+        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+            OutputJobInfo jobInfo = (OutputJobInfo)HCatUtil.deserialize(context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
+            HCatRecord record = new DefaultHCatRecord(3);
+            HCatSchema schema = jobInfo.getOutputSchema();
+            String vals[] = value.toString().split(",");
+            record.setInteger("key",schema,Integer.parseInt(vals[0]));
+            for(int i=1;i<vals.length;i++) {
+                String pair[] = vals[i].split(":");
+                record.set(pair[0],schema,pair[1]);
+            }
+            context.write(null,record);
+        }
+    }
+
+    @Test
+    public void hbaseBulkOutputFormatTest() throws IOException, ClassNotFoundException, InterruptedException {
+        String testName = "hbaseBulkOutputFormatTest";
+        Path methodTestDir = new Path(getTestDir(),testName);
+        LOG.info("starting: "+testName);
+
+        String tableName = newTableName(testName).toLowerCase();
+        String familyName = "my_family";
+        byte[] familyNameBytes = Bytes.toBytes(familyName);
+
+        //include hbase config in conf file
+        Configuration conf = new Configuration(allConf);
+
+        //create table
+        conf.set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, tableName);
+        createTable(tableName, new String[]{familyName});
+
+        String data[] = {"1,english:one,spanish:uno",
+                               "2,english:two,spanish:dos",
+                               "3,english:three,spanish:tres"};
+
+
+
+        // input/output settings
+        Path inputPath = new Path(methodTestDir,"mr_input");
+        FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile.txt"));
+        for(String line: data)
+            os.write(Bytes.toBytes(line + "\n"));
+        os.close();
+        Path interPath = new Path(methodTestDir,"inter");
+        //create job
+        JobConf job = new JobConf(conf);
+        job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
+        job.setJarByClass(this.getClass());
+        job.setMapperClass(MapWriteOldMapper.class);
+
+        job.setInputFormat(org.apache.hadoop.mapred.TextInputFormat.class);
+        org.apache.hadoop.mapred.TextInputFormat.setInputPaths(job, inputPath);
+
+        job.setOutputFormat(HBaseBulkOutputFormat.class);
+        org.apache.hadoop.mapred.SequenceFileOutputFormat.setOutputPath(job, interPath);
+
+        //manually create transaction
+        RevisionManager rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf);
+        try {
+            OutputJobInfo outputJobInfo = OutputJobInfo.create("default", tableName, null);
+            Transaction txn = rm.beginWriteTransaction(tableName, Arrays.asList(familyName));
+            outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY,
+                                                      HCatUtil.serialize(txn));
+            job.set(HCatConstants.HCAT_KEY_OUTPUT_INFO,
+                                       HCatUtil.serialize(outputJobInfo));
+        } finally {
+            rm.close();
+        }
+
+        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+        job.setMapOutputValueClass(HCatRecord.class);
+
+        job.setOutputKeyClass(ImmutableBytesWritable.class);
+        job.setOutputValueClass(HCatRecord.class);
+
+        job.setNumReduceTasks(0);
+
+        RunningJob runJob = JobClient.runJob(job);
+        runJob.waitForCompletion();
+        assertTrue(runJob.isSuccessful());
+
+        //verify
+        HTable table = new HTable(conf, tableName);
+        Scan scan = new Scan();
+        scan.addFamily(familyNameBytes);
+        ResultScanner scanner = table.getScanner(scan);
+        int index=0;
+        for(Result result: scanner) {
+            String vals[] = data[index].toString().split(",");
+            for(int i=1;i<vals.length;i++) {
+                String pair[] = vals[i].split(":");
+                assertTrue(result.containsColumn(familyNameBytes,Bytes.toBytes(pair[0])));
+                assertEquals(pair[1],Bytes.toString(result.getValue(familyNameBytes,Bytes.toBytes(pair[0]))));
+            }
+            index++;
+        }
+        //test if load count is the same
+        assertEquals(data.length,index);
+        //test if scratch directory was erased
+        assertFalse(FileSystem.get(job).exists(interPath));
+    }
+
+    @Test
+    public void importSequenceFileTest() throws IOException, ClassNotFoundException, InterruptedException {
+        String testName = "importSequenceFileTest";
+        Path methodTestDir = new Path(getTestDir(),testName);
+        LOG.info("starting: "+testName);
+
+        String tableName = newTableName(testName).toLowerCase();
+        String familyName = "my_family";
+        byte[] familyNameBytes = Bytes.toBytes(familyName);
+
+        //include hbase config in conf file
+        Configuration conf = new Configuration(allConf);
+
+        //create table
+        createTable(tableName,new String[]{familyName});
+
+        String data[] = {"1,english:one,spanish:uno",
+                               "2,english:two,spanish:dos",
+                               "3,english:three,spanish:tres"};
+
+
+
+        // input/output settings
+        Path inputPath = new Path(methodTestDir,"mr_input");
+        getFileSystem().mkdirs(inputPath);
+        FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile.txt"));
+        for(String line: data)
+            os.write(Bytes.toBytes(line + "\n"));
+        os.close();
+        Path interPath = new Path(methodTestDir,"inter");
+        Path scratchPath = new Path(methodTestDir,"scratch");
+
+
+        //create job
+        Job job = new Job(conf, testName);
+        job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
+        job.setJarByClass(this.getClass());
+        job.setMapperClass(MapWrite.class);
+
+        job.setInputFormatClass(TextInputFormat.class);
+        TextInputFormat.setInputPaths(job, inputPath);
+
+        job.setOutputFormatClass(SequenceFileOutputFormat.class);
+        SequenceFileOutputFormat.setOutputPath(job,interPath);
+
+        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+        job.setMapOutputValueClass(Put.class);
+
+        job.setOutputKeyClass(ImmutableBytesWritable.class);
+        job.setOutputValueClass(Put.class);
+
+        job.setNumReduceTasks(0);
+        assertTrue(job.waitForCompletion(true));
+
+        job = new Job(new Configuration(allConf),testName+"_importer");
+        assertTrue(ImportSequenceFile.runJob(job, tableName, interPath, scratchPath));
+
+        //verify
+        HTable table = new HTable(conf, tableName);
+        Scan scan = new Scan();
+        scan.addFamily(familyNameBytes);
+        ResultScanner scanner = table.getScanner(scan);
+        int index=0;
+        for(Result result: scanner) {
+            String vals[] = data[index].toString().split(",");
+            for(int i=1;i<vals.length;i++) {
+                String pair[] = vals[i].split(":");
+                assertTrue(result.containsColumn(familyNameBytes,Bytes.toBytes(pair[0])));
+                assertEquals(pair[1],Bytes.toString(result.getValue(familyNameBytes,Bytes.toBytes(pair[0]))));
+            }
+            index++;
+        }
+        //test if load count is the same
+        assertEquals(data.length,index);
+        //test if scratch directory was erased
+        assertFalse(FileSystem.get(job.getConfiguration()).exists(scratchPath));
+    }
+
+    @Test
+    public void bulkModeHCatOutputFormatTest() throws Exception {
+        String testName = "bulkModeHCatOutputFormatTest";
+        Path methodTestDir = new Path(getTestDir(),testName);
+        LOG.info("starting: "+testName);
+
+        String databaseName = testName.toLowerCase();
+        String dbDir = new Path(methodTestDir,"DB_"+testName).toString();
+        String tableName = newTableName(testName).toLowerCase();
+        String familyName = "my_family";
+        byte[] familyNameBytes = Bytes.toBytes(familyName);
+
+
+        //include hbase config in conf file
+        Configuration conf = new Configuration(allConf);
+        conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(allConf.getAllProperties()));
+
+
+        String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '" + dbDir + "'";
+        String tableQuery = "CREATE TABLE " + databaseName + "." + tableName +
+                              "(key int, english string, spanish string) STORED BY " +
+                              "'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" +
+                              "TBLPROPERTIES ('"+HBaseConstants.PROPERTY_OSD_BULK_MODE_KEY+"'='true',"+
+                              "'hbase.columns.mapping'=':key,"+familyName+":english,"+familyName+":spanish')" ;
+
+        assertEquals(0, hcatDriver.run(dbquery).getResponseCode());
+        assertEquals(0, hcatDriver.run(tableQuery).getResponseCode());
+
+        String data[] = {"1,english:ONE,spanish:UNO",
+                               "2,english:TWO,spanish:DOS",
+                               "3,english:THREE,spanish:TRES"};
+
+        // input/output settings
+        Path inputPath = new Path(methodTestDir,"mr_input");
+        getFileSystem().mkdirs(inputPath);
+        //create multiple files so we can test with multiple mappers
+        for(int i=0;i<data.length;i++) {
+            FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile"+i+".txt"));
+            os.write(Bytes.toBytes(data[i] + "\n"));
+            os.close();
+        }
+
+        //create job
+        Job job = new Job(conf,testName);
+        job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
+        job.setJarByClass(this.getClass());
+        job.setMapperClass(MapHCatWrite.class);
+
+        job.setInputFormatClass(TextInputFormat.class);
+        TextInputFormat.setInputPaths(job, inputPath);
+
+
+        job.setOutputFormatClass(HCatOutputFormat.class);
+        OutputJobInfo outputJobInfo = OutputJobInfo.create(databaseName,tableName,null);
+        HCatOutputFormat.setOutput(job,outputJobInfo);
+
+        job.setMapOutputKeyClass(BytesWritable.class);
+        job.setMapOutputValueClass(HCatRecord.class);
+
+        job.setOutputKeyClass(BytesWritable.class);
+        job.setOutputValueClass(HCatRecord.class);
+
+        job.setNumReduceTasks(0);
+
+        assertTrue(job.waitForCompletion(true));
+        RevisionManager rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf);
+        try {
+            TableSnapshot snapshot = rm.createSnapshot(databaseName+"."+tableName);
+            for(String el: snapshot.getColumnFamilies()) {
+                assertEquals(1,snapshot.getRevision(el));
+            }
+        } finally {
+            rm.close();
+        }
+
+        //verify
+        HTable table = new HTable(conf, databaseName+"."+tableName);
+        Scan scan = new Scan();
+        scan.addFamily(familyNameBytes);
+        ResultScanner scanner = table.getScanner(scan);
+        int index=0;
+        for(Result result: scanner) {
+            String vals[] = data[index].toString().split(",");
+            for(int i=1;i<vals.length;i++) {
+                String pair[] = vals[i].split(":");
+                assertTrue(result.containsColumn(familyNameBytes,Bytes.toBytes(pair[0])));
+                assertEquals(pair[1],Bytes.toString(result.getValue(familyNameBytes,Bytes.toBytes(pair[0]))));
+                assertEquals(1l,result.getColumn(familyNameBytes,Bytes.toBytes(pair[0])).get(0).getTimestamp());
+            }
+            index++;
+        }
+        //test if load count is the same
+        assertEquals(data.length,index);
+    }
+
+    @Test
+    public void bulkModeHCatOutputFormatTestWithDefaultDB() throws Exception {
+        String testName = "bulkModeHCatOutputFormatTestWithDefaultDB";
+        Path methodTestDir = new Path(getTestDir(),testName);
+
+        String databaseName = "default";
+        String dbDir = new Path(methodTestDir,"DB_"+testName).toString();
+        String tableName = newTableName(testName).toLowerCase();
+        String familyName = "my_family";
+        byte[] familyNameBytes = Bytes.toBytes(familyName);
+
+
+        //include hbase config in conf file
+        Configuration conf = new Configuration(allConf);
+        conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(allConf.getAllProperties()));
+
+
+        String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '" + dbDir + "'";
+        String tableQuery = "CREATE TABLE " + databaseName + "." + tableName +
+                              "(key int, english string, spanish string) STORED BY " +
+                              "'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" +
+                              "TBLPROPERTIES ('"+HBaseConstants.PROPERTY_OSD_BULK_MODE_KEY+"'='true',"+
+                              "'hbase.columns.mapping'=':key,"+familyName+":english,"+familyName+":spanish')" ;
+
+        assertEquals(0, hcatDriver.run(dbquery).getResponseCode());
+        assertEquals(0, hcatDriver.run(tableQuery).getResponseCode());
+
+        String data[] = {"1,english:ONE,spanish:UNO",
+                               "2,english:TWO,spanish:DOS",
+                               "3,english:THREE,spanish:TRES"};
+
+        // input/output settings
+        Path inputPath = new Path(methodTestDir,"mr_input");
+        getFileSystem().mkdirs(inputPath);
+        FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile.txt"));
+        for(String line: data)
+            os.write(Bytes.toBytes(line + "\n"));
+        os.close();
+
+        //create job
+        Job job = new Job(conf,testName);
+        job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
+        job.setJarByClass(this.getClass());
+        job.setMapperClass(MapHCatWrite.class);
+
+        job.setInputFormatClass(TextInputFormat.class);
+        TextInputFormat.setInputPaths(job, inputPath);
+
+
+        job.setOutputFormatClass(HCatOutputFormat.class);
+        OutputJobInfo outputJobInfo = OutputJobInfo.create(databaseName,tableName,null);
+        HCatOutputFormat.setOutput(job,outputJobInfo);
+
+        job.setMapOutputKeyClass(BytesWritable.class);
+        job.setMapOutputValueClass(HCatRecord.class);
+
+        job.setOutputKeyClass(BytesWritable.class);
+        job.setOutputValueClass(HCatRecord.class);
+
+        job.setNumReduceTasks(0);
+
+        assertTrue(job.waitForCompletion(true));
+
+        //verify
+        HTable table = new HTable(conf, tableName);
+        Scan scan = new Scan();
+        scan.addFamily(familyNameBytes);
+        ResultScanner scanner = table.getScanner(scan);
+        int index=0;
+        for(Result result: scanner) {
+            String vals[] = data[index].toString().split(",");
+            for(int i=1;i<vals.length;i++) {
+                String pair[] = vals[i].split(":");
+                assertTrue(result.containsColumn(familyNameBytes,Bytes.toBytes(pair[0])));
+                assertEquals(pair[1],Bytes.toString(result.getValue(familyNameBytes,Bytes.toBytes(pair[0]))));
+            }
+            index++;
+        }
+        //test if load count is the same
+        assertEquals(data.length,index);
+    }
+
+    @Test
+    public void bulkModeAbortTest() throws Exception {
+        String testName = "bulkModeAbortTest";
+        Path methodTestDir = new Path(getTestDir(), testName);
+        String databaseName = testName.toLowerCase();
+        String dbDir = new Path(methodTestDir, "DB_" + testName).toString();
+        String tableName = newTableName(testName).toLowerCase();
+        String familyName = "my_family";
+
+        // include hbase config in conf file
+        Configuration conf = new Configuration(allConf);
+        conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(allConf.getAllProperties()));
+
+        String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '" + dbDir
+                + "'";
+        String tableQuery = "CREATE TABLE " + databaseName + "." + tableName +
+                "(key int, english string, spanish string) STORED BY " +
+                "'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" +
+                "TBLPROPERTIES ('" + HBaseConstants.PROPERTY_OSD_BULK_MODE_KEY + "'='true'," +
+                "'hbase.columns.mapping'=':key," + familyName + ":english," + familyName
+                + ":spanish')";
+
+        assertEquals(0, hcatDriver.run(dbquery).getResponseCode());
+        assertEquals(0, hcatDriver.run(tableQuery).getResponseCode());
+
+        String data[] = {"1,english:ONE,spanish:UNO",
+                "2,english:TWO,spanish:DOS",
+                "3,english:THREE,spanish:TRES"};
+
+        Path inputPath = new Path(methodTestDir, "mr_input");
+        getFileSystem().mkdirs(inputPath);
+        // create multiple files so we can test with multiple mappers
+        for (int i = 0; i < data.length; i++) {
+            FSDataOutputStream os = getFileSystem().create(
+                    new Path(inputPath, "inputFile" + i + ".txt"));
+            os.write(Bytes.toBytes(data[i] + "\n"));
+            os.close();
+        }
+
+        Path workingDir = new Path(methodTestDir, "mr_abort");
+        OutputJobInfo outputJobInfo = OutputJobInfo.create(databaseName,
+                tableName, null);
+        Job job = configureJob(testName,
+                conf, workingDir, MapWriteAbortTransaction.class,
+                outputJobInfo, inputPath);
+        job.waitForCompletion(true);
+        assertFalse(job.waitForCompletion(true));
+
+        // verify that revision manager has it as aborted transaction
+        RevisionManager rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf);
+        try {
+            TableSnapshot snapshot = rm.createSnapshot(databaseName + "." + tableName);
+            for (String family : snapshot.getColumnFamilies()) {
+                assertEquals(1, snapshot.getRevision(family));
+                List<FamilyRevision> abortedWriteTransactions = rm.getAbortedWriteTransactions(
+                        databaseName + "." + tableName, family);
+                assertEquals(1, abortedWriteTransactions.size());
+                assertEquals(1, abortedWriteTransactions.get(0).getRevision());
+            }
+        } finally {
+            rm.close();
+        }
+
+        //verify that hbase does not have any of the records.
+        //Since records are only written during commitJob,
+        //hbase should not have any records.
+        HTable table = new HTable(conf, databaseName + "." + tableName);
+        Scan scan = new Scan();
+        scan.addFamily(Bytes.toBytes(familyName));
+        ResultScanner scanner = table.getScanner(scan);
+        assertFalse(scanner.iterator().hasNext());
+
+        // verify that the input storage driver returns empty results.
+        Path outputDir = new Path(getTestDir(),
+                "mapred/testHBaseTableBulkIgnoreAbortedTransactions");
+        FileSystem fs = getFileSystem();
+        if (fs.exists(outputDir)) {
+            fs.delete(outputDir, true);
+        }
+        job = new Job(conf, "hbase-bulk-aborted-transaction");
+        job.setJarByClass(this.getClass());
+        job.setMapperClass(MapReadAbortedTransaction.class);
+        job.setInputFormatClass(HCatInputFormat.class);
+        InputJobInfo inputJobInfo = InputJobInfo.create(databaseName,
+                tableName, null, null, null);
+        HCatInputFormat.setInput(job, inputJobInfo);
+        job.setOutputFormatClass(TextOutputFormat.class);
+        TextOutputFormat.setOutputPath(job, outputDir);
+        job.setMapOutputKeyClass(BytesWritable.class);
+        job.setMapOutputValueClass(Text.class);
+        job.setOutputKeyClass(BytesWritable.class);
+        job.setOutputValueClass(Text.class);
+        job.setNumReduceTasks(0);
+        assertTrue(job.waitForCompletion(true));
+    }
+
+    private Job configureJob(String jobName, Configuration conf,
+            Path workingDir, Class<? extends Mapper> mapperClass,
+            OutputJobInfo outputJobInfo, Path inputPath) throws IOException {
+        Job job = new Job(conf, jobName);
+        job.setWorkingDirectory(workingDir);
+        job.setJarByClass(this.getClass());
+        job.setMapperClass(mapperClass);
+
+        job.setInputFormatClass(TextInputFormat.class);
+        TextInputFormat.setInputPaths(job, inputPath);
+        job.setOutputFormatClass(HCatOutputFormat.class);
+        HCatOutputFormat.setOutput(job, outputJobInfo);
+
+        job.setMapOutputKeyClass(BytesWritable.class);
+        job.setMapOutputValueClass(HCatRecord.class);
+        job.setOutputKeyClass(BytesWritable.class);
+        job.setOutputValueClass(HCatRecord.class);
+
+        job.setNumReduceTasks(0);
+        return job;
+    }
+
+}
+

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java?rev=1296568&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java Sat Mar  3 02:56:05 2012
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapred.TableOutputFormat;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hcatalog.cli.HCatDriver;
+import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.hbase.snapshot.FamilyRevision;
+import org.apache.hcatalog.hbase.snapshot.RevisionManager;
+import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
+import org.apache.hcatalog.hbase.snapshot.Transaction;
+import org.apache.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test HBaseDirectOUtputFormat and HBaseHCatStorageHandler using a MiniCluster
+ */
+public class TestHBaseDirectOutputFormat extends SkeletonHBaseTest {
+
+    private final HiveConf allConf;
+    private final HCatDriver hcatDriver;
+
+    public TestHBaseDirectOutputFormat() {
+        allConf = getHiveConf();
+        allConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
+                HCatSemanticAnalyzer.class.getName());
+        allConf.set(HiveConf.ConfVars.HADOOPFS.varname, getFileSystem().getUri().toString());
+        allConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, new Path(getTestDir(),"warehouse").toString());
+
+        //Add hbase properties
+        for (Map.Entry<String, String> el : getHbaseConf())
+            allConf.set(el.getKey(), el.getValue());
+        for (Map.Entry<String, String> el : getJobConf())
+            allConf.set(el.getKey(), el.getValue());
+
+        SessionState.start(new CliSessionState(allConf));
+        hcatDriver = new HCatDriver();
+    }
+
+    @Test
+    public void directOutputFormatTest() throws IOException, ClassNotFoundException, InterruptedException {
+        String testName = "directOutputFormatTest";
+        Path methodTestDir = new Path(getTestDir(),testName);
+
+        String tableName = newTableName(testName).toLowerCase();
+        String familyName = "my_family";
+        byte[] familyNameBytes = Bytes.toBytes(familyName);
+
+        //include hbase config in conf file
+        Configuration conf = new Configuration(allConf);
+        conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(allConf.getAllProperties()));
+
+        //create table
+        createTable(tableName,new String[]{familyName});
+
+        String data[] = {"1,english:ONE,spanish:UNO",
+                "2,english:ONE,spanish:DOS",
+                "3,english:ONE,spanish:TRES"};
+
+
+
+        // input/output settings
+        Path inputPath = new Path(methodTestDir,"mr_input");
+        getFileSystem().mkdirs(inputPath);
+        FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile.txt"));
+        for(String line: data)
+            os.write(Bytes.toBytes(line + "\n"));
+        os.close();
+
+        //create job
+        JobConf job = new JobConf(conf);
+        job.setJobName(testName);
+        job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
+        job.setJarByClass(this.getClass());
+        job.setMapperClass(MapWrite.class);
+
+        job.setInputFormat(org.apache.hadoop.mapred.TextInputFormat.class);
+        org.apache.hadoop.mapred.TextInputFormat.setInputPaths(job, inputPath);
+
+        job.setOutputFormat(HBaseDirectOutputFormat.class);
+        job.set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, tableName);
+
+        //manually create transaction
+        RevisionManager rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf);
+        try {
+            OutputJobInfo outputJobInfo = OutputJobInfo.create("default", tableName, null);
+            Transaction txn = rm.beginWriteTransaction(tableName, Arrays.asList(familyName));
+            outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY,
+                                                      HCatUtil.serialize(txn));
+            job.set(HCatConstants.HCAT_KEY_OUTPUT_INFO,
+                                       HCatUtil.serialize(outputJobInfo));
+        } finally {
+            rm.close();
+        }
+
+        job.setMapOutputKeyClass(BytesWritable.class);
+        job.setMapOutputValueClass(HCatRecord.class);
+        job.setOutputKeyClass(BytesWritable.class);
+        job.setOutputValueClass(HCatRecord.class);
+        job.setNumReduceTasks(0);
+
+        RunningJob runJob = JobClient.runJob(job);
+        runJob.waitForCompletion();
+        assertTrue(runJob.isSuccessful());
+
+        //verify
+        HTable table = new HTable(conf, tableName);
+        Scan scan = new Scan();
+        scan.addFamily(familyNameBytes);
+        ResultScanner scanner = table.getScanner(scan);
+        int index=0;
+        for(Result result: scanner) {
+            String vals[] = data[index].toString().split(",");
+            for(int i=1;i<vals.length;i++) {
+                String pair[] = vals[i].split(":");
+                assertTrue(result.containsColumn(familyNameBytes,Bytes.toBytes(pair[0])));
+                assertEquals(pair[1],Bytes.toString(result.getValue(familyNameBytes,Bytes.toBytes(pair[0]))));
+            }
+            index++;
+        }
+        assertEquals(data.length,index);
+    }
+
+    @Test
+    public void directHCatOutputFormatTest() throws Exception {
+        String testName = "directHCatOutputFormatTest";
+        Path methodTestDir = new Path(getTestDir(),testName);
+
+        String databaseName = testName.toLowerCase();
+        String dbDir = new Path(methodTestDir,"DB_"+testName).toString();
+        String tableName = newTableName(testName).toLowerCase();
+        String familyName = "my_family";
+        byte[] familyNameBytes = Bytes.toBytes(familyName);
+
+
+        //include hbase config in conf file
+        Configuration conf = new Configuration(allConf);
+        conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(allConf.getAllProperties()));
+
+
+        String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '" + dbDir + "'";
+        String tableQuery = "CREATE TABLE " + databaseName + "." + tableName +
+                              "(key int, english string, spanish string) STORED BY " +
+                              "'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" +
+                              "TBLPROPERTIES (" +
+                              "'hbase.columns.mapping'=':key,"+familyName+":english,"+familyName+":spanish')" ;
+
+        assertEquals(0, hcatDriver.run(dbquery).getResponseCode());
+        assertEquals(0, hcatDriver.run(tableQuery).getResponseCode());
+
+        String data[] = {"1,english:ONE,spanish:UNO",
+                               "2,english:ONE,spanish:DOS",
+                               "3,english:ONE,spanish:TRES"};
+
+        // input/output settings
+        Path inputPath = new Path(methodTestDir,"mr_input");
+        getFileSystem().mkdirs(inputPath);
+        //create multiple files so we can test with multiple mappers
+        for(int i=0;i<data.length;i++) {
+            FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile"+i+".txt"));
+            os.write(Bytes.toBytes(data[i] + "\n"));
+            os.close();
+        }
+
+        //create job
+        Path workingDir = new Path(methodTestDir, "mr_work");
+        OutputJobInfo outputJobInfo = OutputJobInfo.create(databaseName,
+                tableName, null);
+        Job job = configureJob(testName, conf, workingDir, MapHCatWrite.class,
+                outputJobInfo, inputPath);
+        assertTrue(job.waitForCompletion(true));
+
+        RevisionManager rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf);
+        try {
+            TableSnapshot snapshot = rm.createSnapshot(databaseName+"."+tableName);
+            for(String el: snapshot.getColumnFamilies()) {
+                assertEquals(1,snapshot.getRevision(el));
+            }
+        } finally {
+            rm.close();
+        }
+
+        //verify
+        HTable table = new HTable(conf, databaseName+"."+tableName);
+        Scan scan = new Scan();
+        scan.addFamily(familyNameBytes);
+        ResultScanner scanner = table.getScanner(scan);
+        int index=0;
+        for(Result result: scanner) {
+            String vals[] = data[index].toString().split(",");
+            for(int i=1;i<vals.length;i++) {
+                String pair[] = vals[i].split(":");
+                assertTrue(result.containsColumn(familyNameBytes,Bytes.toBytes(pair[0])));
+                assertEquals(pair[1],Bytes.toString(result.getValue(familyNameBytes,Bytes.toBytes(pair[0]))));
+                assertEquals(1l,result.getColumn(familyNameBytes,Bytes.toBytes(pair[0])).get(0).getTimestamp());
+            }
+            index++;
+        }
+        assertEquals(data.length,index);
+    }
+
+    @Test
+    public void directModeAbortTest() throws Exception {
+        String testName = "directModeAbortTest";
+        Path methodTestDir = new Path(getTestDir(), testName);
+        String databaseName = testName.toLowerCase();
+        String dbDir = new Path(methodTestDir, "DB_" + testName).toString();
+        String tableName = newTableName(testName).toLowerCase();
+        String familyName = "my_family";
+        byte[] familyNameBytes = Bytes.toBytes(familyName);
+
+        // include hbase config in conf file
+        Configuration conf = new Configuration(allConf);
+        conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(allConf.getAllProperties()));
+
+        String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '" + dbDir
+                + "'";
+        String tableQuery = "CREATE TABLE " + databaseName + "." + tableName +
+                "(key int, english string, spanish string) STORED BY " +
+                "'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" +
+                "TBLPROPERTIES (" +
+                "'hbase.columns.mapping'=':key," + familyName + ":english," + familyName +
+                ":spanish')";
+
+        assertEquals(0, hcatDriver.run(dbquery).getResponseCode());
+        assertEquals(0, hcatDriver.run(tableQuery).getResponseCode());
+
+        String data[] = {"1,english:ONE,spanish:UNO",
+                "2,english:TWO,spanish:DOS",
+                "3,english:THREE,spanish:TRES"};
+
+        Path inputPath = new Path(methodTestDir, "mr_input");
+        getFileSystem().mkdirs(inputPath);
+        // create multiple files so we can test with multiple mappers
+        for (int i = 0; i < data.length; i++) {
+            FSDataOutputStream os = getFileSystem().create(
+                    new Path(inputPath, "inputFile" + i + ".txt"));
+            os.write(Bytes.toBytes(data[i] + "\n"));
+            os.close();
+        }
+
+        Path workingDir = new Path(methodTestDir, "mr_abort");
+        OutputJobInfo outputJobInfo = OutputJobInfo.create(databaseName,
+                tableName, null);
+        Job job = configureJob(testName, conf, workingDir, MapWriteAbortTransaction.class,
+                outputJobInfo, inputPath);
+        job.waitForCompletion(true);
+        assertFalse(job.waitForCompletion(true));
+
+        // verify that revision manager has it as aborted transaction
+        RevisionManager rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf);
+        try {
+            TableSnapshot snapshot = rm.createSnapshot(databaseName + "." + tableName);
+            for (String family : snapshot.getColumnFamilies()) {
+                assertEquals(1, snapshot.getRevision(family));
+                List<FamilyRevision> abortedWriteTransactions = rm.getAbortedWriteTransactions(
+                        databaseName + "." + tableName, family);
+                assertEquals(1, abortedWriteTransactions.size());
+                assertEquals(1, abortedWriteTransactions.get(0).getRevision());
+            }
+        } finally {
+            rm.close();
+        }
+
+        // verify that hbase has the records of the successful maps.
+        HTable table = new HTable(conf, databaseName + "." + tableName);
+        Scan scan = new Scan();
+        scan.addFamily(familyNameBytes);
+        ResultScanner scanner = table.getScanner(scan);
+        int index = 0;
+        for (Result result : scanner) {
+            String vals[] = data[index].toString().split(",");
+            for (int i = 1; i < vals.length; i++) {
+                String pair[] = vals[i].split(":");
+                assertTrue(result.containsColumn(familyNameBytes, Bytes.toBytes(pair[0])));
+                assertEquals(pair[1],
+                        Bytes.toString(result.getValue(familyNameBytes, Bytes.toBytes(pair[0]))));
+                assertEquals(1l, result.getColumn(familyNameBytes, Bytes.toBytes(pair[0])).get(0)
+                        .getTimestamp());
+            }
+            index++;
+        }
+        assertEquals(data.length - 1, index);
+
+        // verify that the inputformat returns empty results.
+        Path outputDir = new Path(getTestDir(),
+                "mapred/testHBaseTableIgnoreAbortedTransactions");
+        FileSystem fs = getFileSystem();
+        if (fs.exists(outputDir)) {
+            fs.delete(outputDir, true);
+        }
+        job = new Job(conf, "hbase-aborted-transaction");
+        job.setJarByClass(this.getClass());
+        job.setMapperClass(MapReadAbortedTransaction.class);
+        job.setInputFormatClass(HCatInputFormat.class);
+        InputJobInfo inputJobInfo = InputJobInfo.create(databaseName,
+                tableName, null, null, null);
+        HCatInputFormat.setInput(job, inputJobInfo);
+        job.setOutputFormatClass(TextOutputFormat.class);
+        TextOutputFormat.setOutputPath(job, outputDir);
+        job.setMapOutputKeyClass(BytesWritable.class);
+        job.setMapOutputValueClass(Text.class);
+        job.setOutputKeyClass(BytesWritable.class);
+        job.setOutputValueClass(Text.class);
+        job.setNumReduceTasks(0);
+        assertTrue(job.waitForCompletion(true));
+    }
+
+    private Job configureJob(String jobName, Configuration conf,
+            Path workingDir, Class<? extends Mapper> mapperClass,
+            OutputJobInfo outputJobInfo, Path inputPath) throws IOException {
+        Job job = new Job(conf, jobName);
+        job.setWorkingDirectory(workingDir);
+        job.setJarByClass(this.getClass());
+        job.setMapperClass(mapperClass);
+
+        job.setInputFormatClass(TextInputFormat.class);
+        TextInputFormat.setInputPaths(job, inputPath);
+        job.setOutputFormatClass(HCatOutputFormat.class);
+        HCatOutputFormat.setOutput(job, outputJobInfo);
+
+        job.setMapOutputKeyClass(BytesWritable.class);
+        job.setMapOutputValueClass(HCatRecord.class);
+        job.setOutputKeyClass(BytesWritable.class);
+        job.setOutputValueClass(HCatRecord.class);
+
+        job.setNumReduceTasks(0);
+        return job;
+    }
+
+    public static class MapHCatWrite extends Mapper<LongWritable, Text, BytesWritable, HCatRecord> {
+
+        @Override
+        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+            OutputJobInfo jobInfo = (OutputJobInfo)HCatUtil.deserialize(context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
+            HCatRecord record = new DefaultHCatRecord(3);
+            HCatSchema schema = jobInfo.getOutputSchema();
+            String vals[] = value.toString().split(",");
+            record.setInteger("key",schema,Integer.parseInt(vals[0]));
+            for(int i=1;i<vals.length;i++) {
+                String pair[] = vals[i].split(":");
+                record.set(pair[0],schema,pair[1]);
+            }
+            context.write(null,record);
+        }
+    }
+
+    public static class MapWrite implements org.apache.hadoop.mapred.Mapper<LongWritable, Text, BytesWritable, Put> {
+
+        @Override
+        public void configure(JobConf job) {
+        }
+
+        @Override
+        public void close() throws IOException {
+        }
+
+        @Override
+        public void map(LongWritable key, Text value,
+                OutputCollector<BytesWritable, Put> output, Reporter reporter)
+                throws IOException {
+            String vals[] = value.toString().split(",");
+            Put put = new Put(Bytes.toBytes(vals[0]));
+            for(int i=1;i<vals.length;i++) {
+                String pair[] = vals[i].split(":");
+                put.add(Bytes.toBytes("my_family"),
+                        Bytes.toBytes(pair[0]),
+                        Bytes.toBytes(pair[1]));
+            }
+            output.collect(null, put);
+        }
+    }
+
+    static class MapWriteAbortTransaction extends Mapper<LongWritable, Text, BytesWritable, HCatRecord> {
+
+        @Override
+        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+            OutputJobInfo jobInfo = (OutputJobInfo)HCatUtil.deserialize(context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
+            HCatRecord record = new DefaultHCatRecord(3);
+            HCatSchema schema = jobInfo.getOutputSchema();
+            String vals[] = value.toString().split(",");
+            record.setInteger("key", schema, Integer.parseInt(vals[0]));
+            if (vals[0].equals("3")) {
+                throw new IOException("Failing map to test abort");
+            }
+            for (int i = 1; i < vals.length; i++) {
+                String pair[] = vals[i].split(":");
+                record.set(pair[0], schema, pair[1]);
+            }
+            context.write(null, record);
+        }
+
+    }
+
+    static class MapReadAbortedTransaction
+            extends
+            Mapper<ImmutableBytesWritable, HCatRecord, WritableComparable<?>, Text> {
+
+        @Override
+        public void run(Context context) throws IOException,
+                InterruptedException {
+            setup(context);
+            if (context.nextKeyValue()) {
+                map(context.getCurrentKey(), context.getCurrentValue(), context);
+                while (context.nextKeyValue()) {
+                    map(context.getCurrentKey(), context.getCurrentValue(),
+                            context);
+                }
+                throw new IOException("There should have been no records");
+            }
+            cleanup(context);
+        }
+
+        @Override
+        public void map(ImmutableBytesWritable key, HCatRecord value,
+                Context context) throws IOException, InterruptedException {
+            System.out.println("HCat record value" + value.toString());
+        }
+    }
+}