You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by ga...@apache.org on 2012/03/20 16:26:50 UTC

svn commit: r1302978 - in /incubator/hcatalog/branches/branch-0.4: ./ src/java/org/apache/hcatalog/pig/ storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/ storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/

Author: gates
Date: Tue Mar 20 16:26:49 2012
New Revision: 1302978

URL: http://svn.apache.org/viewvc?rev=1302978&view=rev
Log:
HCATALOG-302 unable to write to hbase channel. HBaseHCatStorageHandler class not found

Modified:
    incubator/hcatalog/branches/branch-0.4/CHANGES.txt
    incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatStorer.java
    incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java
    incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java
    incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java
    incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java
    incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java
    incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java
    incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java
    incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java

Modified: incubator/hcatalog/branches/branch-0.4/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/CHANGES.txt?rev=1302978&r1=1302977&r2=1302978&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/CHANGES.txt (original)
+++ incubator/hcatalog/branches/branch-0.4/CHANGES.txt Tue Mar 20 16:26:49 2012
@@ -30,6 +30,8 @@ Release 0.4.1 - Unreleased
   BUG FIXES
   HCAT-319 Cleanup of 0.3 mapred classes (khorgath via gates)
 
+  HCAT-302 unable to write to hbase channel. HBaseHCatStorageHandler class not found (rohini via gates)
+
 Release 0.4.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatStorer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatStorer.java?rev=1302978&r1=1302977&r2=1302978&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatStorer.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatStorer.java Tue Mar 20 16:26:49 2012
@@ -70,27 +70,30 @@ public class HCatStorer extends HCatBase
 
   @Override
   public void setStoreLocation(String location, Job job) throws IOException {
-
     job.getConfiguration().set(INNER_SIGNATURE, INNER_SIGNATURE_PREFIX + "_" + sign);
     Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign});
 
     String[] userStr = location.split("\\.");
     OutputJobInfo outputJobInfo;
 
-    if(userStr.length == 2) {
-      outputJobInfo = OutputJobInfo.create(userStr[0],
+    String outInfoString = p.getProperty(HCatConstants.HCAT_KEY_OUTPUT_INFO);
+    if (outInfoString != null) {
+      outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(outInfoString);
+    } else {
+      if(userStr.length == 2) {
+        outputJobInfo = OutputJobInfo.create(userStr[0],
                                                              userStr[1],
                                                              partitions);
-    } else if(userStr.length == 1) {
-      outputJobInfo = OutputJobInfo.create(null,
+      } else if(userStr.length == 1) {
+        outputJobInfo = OutputJobInfo.create(null,
                                                              userStr[0],
                                                              partitions);
-    } else {
-      throw new FrontendException("location "+location+" is invalid. It must be of the form [db.]table", PigHCatUtil.PIG_EXCEPTION_CODE);
+      } else {
+        throw new FrontendException("location "+location+" is invalid. It must be of the form [db.]table", PigHCatUtil.PIG_EXCEPTION_CODE);
+      }
     }
 
 
-
     Configuration config = job.getConfiguration();
     if(!HCatUtil.checkJobContextIfRunningFromBackend(job)){
 
@@ -123,6 +126,7 @@ public class HCatStorer extends HCatBase
       PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_TOKEN_SIGNATURE);
       PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE);
       PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_STRFORM);
+      PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_OUTPUT_INFO);
 
       p.setProperty(COMPUTED_OUTPUT_SCHEMA,ObjectSerializer.serialize(computedSchema));
 

Modified: incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java?rev=1302978&r1=1302977&r2=1302978&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java (original)
+++ incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java Tue Mar 20 16:26:49 2012
@@ -61,9 +61,6 @@ class HBaseBulkOutputFormat extends HBas
     @Override
     public void checkOutputSpecs(FileSystem ignored, JobConf job)
             throws IOException {
-        job.setOutputKeyClass(ImmutableBytesWritable.class);
-        job.setOutputValueClass(Put.class);
-        job.setOutputCommitter(HBaseBulkOutputCommitter.class);
         baseOutputFormat.checkOutputSpecs(ignored, job);
         HBaseUtil.addHBaseDelegationToken(job);
         addJTDelegationToken(job);
@@ -73,6 +70,8 @@ class HBaseBulkOutputFormat extends HBas
     public RecordWriter<WritableComparable<?>, Put> getRecordWriter(
             FileSystem ignored, JobConf job, String name, Progressable progress)
             throws IOException {
+        job.setOutputKeyClass(ImmutableBytesWritable.class);
+        job.setOutputValueClass(Put.class);
         long version = HBaseRevisionManagerUtil.getOutputRevision(job);
         return new HBaseBulkRecordWriter(baseOutputFormat.getRecordWriter(
                 ignored, job, name, progress), version);
@@ -188,11 +187,21 @@ class HBaseBulkOutputFormat extends HBas
             try {
                 Configuration conf = jobContext.getConfiguration();
                 Path srcPath = FileOutputFormat.getOutputPath(jobContext.getJobConf());
+                if (!FileSystem.get(conf).exists(srcPath)) {
+                    throw new IOException("Failed to bulk import hfiles. " +
+                    		"Intermediate data directory is cleaned up or missing. " +
+                    		"Please look at the bulk import job if it exists for failure reason");
+                }
                 Path destPath = new Path(srcPath.getParent(), srcPath.getName() + "_hfiles");
-                ImportSequenceFile.runJob(jobContext,
+                boolean success = ImportSequenceFile.runJob(jobContext,
                                 conf.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY),
                                 srcPath,
                                 destPath);
+                if(!success) {
+                    cleanIntermediate(jobContext);
+                    throw new IOException("Failed to bulk import hfiles." +
+                    		" Please look at the bulk import job for failure reason");
+                }
                 rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf);
                 rm.commitWriteTransaction(HBaseRevisionManagerUtil.getWriteTransaction(conf));
                 cleanIntermediate(jobContext);

Modified: incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java?rev=1302978&r1=1302977&r2=1302978&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java (original)
+++ incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java Tue Mar 20 16:26:49 2012
@@ -63,9 +63,6 @@ class HBaseDirectOutputFormat extends HB
     @Override
     public void checkOutputSpecs(FileSystem ignored, JobConf job)
             throws IOException {
-        job.setOutputCommitter(HBaseDirectOutputCommitter.class);
-        job.setIfUnset(TableOutputFormat.OUTPUT_TABLE,
-                job.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY));
         outputFormat.checkOutputSpecs(ignored, job);
         HBaseUtil.addHBaseDelegationToken(job);
     }

Modified: incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java?rev=1302978&r1=1302977&r2=1302978&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java (original)
+++ incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java Tue Mar 20 16:26:49 2012
@@ -19,7 +19,6 @@
 package org.apache.hcatalog.hbase;
 
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -27,6 +26,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
+import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.MasterNot
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.mapred.TableOutputFormat;
 import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -48,14 +49,15 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
 import org.apache.hadoop.hive.serde2.SerDe;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hcatalog.common.HCatConstants;
 import org.apache.hcatalog.common.HCatUtil;
 import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.hbase.HBaseBulkOutputFormat.HBaseBulkOutputCommitter;
+import org.apache.hcatalog.hbase.HBaseDirectOutputFormat.HBaseDirectOutputCommitter;
 import org.apache.hcatalog.hbase.snapshot.RevisionManager;
 import org.apache.hcatalog.hbase.snapshot.Transaction;
 import org.apache.hcatalog.hbase.snapshot.ZKBasedRevisionManager;
@@ -68,20 +70,20 @@ import org.apache.thrift.TBase;
 import org.apache.zookeeper.ZooKeeper;
 
 import com.facebook.fb303.FacebookBase;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
  * This class HBaseHCatStorageHandler provides functionality to create HBase
  * tables through HCatalog. The implementation is very similar to the
  * HiveHBaseStorageHandler, with more details to suit HCatalog.
  */
-//TODO remove serializable when HCATALOG-282 is fixed
-public class HBaseHCatStorageHandler extends HCatStorageHandler implements HiveMetaHook, Serializable {
+public class HBaseHCatStorageHandler extends HCatStorageHandler implements HiveMetaHook, Configurable {
 
     public final static String DEFAULT_PREFIX = "default.";
     private final static String PROPERTY_INT_OUTPUT_LOCATION = "hcat.hbase.mapreduce.intermediateOutputLocation";
 
-    private transient Configuration      hbaseConf;
-    private transient HBaseAdmin         admin;
+    private Configuration hbaseConf;
+    private HBaseAdmin admin;
 
     @Override
     public void configureInputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
@@ -96,20 +98,32 @@ public class HBaseHCatStorageHandler ext
             jobProperties.put(TableInputFormat.INPUT_TABLE, qualifiedTableName);
 
             Configuration jobConf = getConf();
+            addHbaseResources(jobConf, jobProperties);
+            Configuration copyOfConf = new Configuration(jobConf);
+            HBaseConfiguration.addHbaseResources(copyOfConf);
+            //Getting hbase delegation token in getInputSplits does not work with PIG. So need to
+            //do it here
+            if (jobConf instanceof JobConf) {
+                HBaseUtil.addHBaseDelegationToken((JobConf)jobConf);
+            }
+
             String outputSchema = jobConf.get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA);
             jobProperties.put(TableInputFormat.SCAN_COLUMNS, getScanColumns(tableInfo, outputSchema));
 
             String serSnapshot = (String) inputJobInfo.getProperties().get(
                     HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY);
             if (serSnapshot == null) {
-                Configuration conf = addHbaseResources(jobConf);
-                HCatTableSnapshot snapshot = HBaseRevisionManagerUtil.createSnapshot(conf,
+                HCatTableSnapshot snapshot = HBaseRevisionManagerUtil.createSnapshot(copyOfConf,
                         qualifiedTableName, tableInfo);
                 jobProperties.put(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY,
                         HCatUtil.serialize(snapshot));
             }
 
-            addHbaseResources(jobConf, jobProperties);
+            //This adds it directly to the jobConf. Setting in jobProperties does not get propagated
+            //to JobConf as of now as the jobProperties is maintained per partition
+            //TODO: Remove when HCAT-308 is fixed
+            addOutputDependencyJars(jobConf);
+            jobProperties.put("tmpjars", jobConf.get("tmpjars"));
 
         } catch (IOException e) {
             throw new IllegalStateException("Error while configuring job properties", e);
@@ -128,33 +142,50 @@ public class HBaseHCatStorageHandler ext
             HCatTableInfo tableInfo = outputJobInfo.getTableInfo();
             String qualifiedTableName = HBaseHCatStorageHandler.getFullyQualifiedName(tableInfo);
             jobProperties.put(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, qualifiedTableName);
+            jobProperties.put(TableOutputFormat.OUTPUT_TABLE, qualifiedTableName);
 
             Configuration jobConf = getConf();
+            addHbaseResources(jobConf, jobProperties);
+
+            Configuration copyOfConf = new Configuration(jobConf);
+            HBaseConfiguration.addHbaseResources(copyOfConf);
+
             String txnString = outputJobInfo.getProperties().getProperty(
                     HBaseConstants.PROPERTY_WRITE_TXN_KEY);
-            if (txnString == null) {
-                Configuration conf = addHbaseResources(jobConf);
-                Transaction txn = HBaseRevisionManagerUtil.beginWriteTransaction(qualifiedTableName, tableInfo, conf);
+            String jobTxnString = jobConf.get(HBaseConstants.PROPERTY_WRITE_TXN_KEY);
+            //Pig makes 3 calls to HCatOutputFormat.setOutput(HCatStorer) with different JobConf
+            //which leads to creating 2 transactions.
+            //So apart from fixing HCatStorer to pass same OutputJobInfo, making the call idempotent for other
+            //cases which might call multiple times but with same JobConf.
+            Transaction txn = null;
+            if (txnString == null && jobTxnString == null) {
+                txn = HBaseRevisionManagerUtil.beginWriteTransaction(qualifiedTableName, tableInfo, copyOfConf);
+                String serializedTxn = HCatUtil.serialize(txn);
                 outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY,
-                        HCatUtil.serialize(txn));
-
-                if (isBulkMode(outputJobInfo) && !(outputJobInfo.getProperties()
-                                .containsKey(PROPERTY_INT_OUTPUT_LOCATION))) {
-                    String tableLocation = tableInfo.getTableLocation();
-                    String location = new Path(tableLocation, "REVISION_" + txn.getRevisionNumber())
-                            .toString();
-                    outputJobInfo.getProperties().setProperty(PROPERTY_INT_OUTPUT_LOCATION,
-                            location);
-                    // We are writing out an intermediate sequenceFile hence
-                    // location is not passed in OutputJobInfo.getLocation()
-                    // TODO replace this with a mapreduce constant when available
-                    jobProperties.put("mapred.output.dir", location);
-                }
+                        serializedTxn);
+                jobProperties.put(HBaseConstants.PROPERTY_WRITE_TXN_KEY, serializedTxn);
+            } else {
+                txnString = (txnString == null) ? jobTxnString : txnString;
+                txn = (Transaction) HCatUtil.deserialize(txnString);
+                outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY,
+                        txnString);
+                jobProperties.put(HBaseConstants.PROPERTY_WRITE_TXN_KEY, txnString);
+            }
+            if (isBulkMode(outputJobInfo)) {
+                String tableLocation = tableInfo.getTableLocation();
+                String location = new Path(tableLocation, "REVISION_" + txn.getRevisionNumber())
+                        .toString();
+                outputJobInfo.getProperties().setProperty(PROPERTY_INT_OUTPUT_LOCATION, location);
+                // We are writing out an intermediate sequenceFile hence
+                // location is not passed in OutputJobInfo.getLocation()
+                // TODO replace this with a mapreduce constant when available
+                jobProperties.put("mapred.output.dir", location);
+                jobProperties.put("mapred.output.committer.class", HBaseBulkOutputCommitter.class.getName());
+            } else {
+                jobProperties.put("mapred.output.committer.class", HBaseDirectOutputCommitter.class.getName());
             }
 
-            jobProperties
-                    .put(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo));
-            addHbaseResources(jobConf, jobProperties);
+            jobProperties.put(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo));
             addOutputDependencyJars(jobConf);
             jobProperties.put("tmpjars", jobConf.get("tmpjars"));
 
@@ -429,7 +460,10 @@ public class HBaseHCatStorageHandler ext
 
     @Override
     public void setConf(Configuration conf) {
-        hbaseConf = HBaseConfiguration.create(conf);
+        //Not cloning as we want to set tmpjars on it. Putting in jobProperties does not
+        //get propagated to JobConf in case of InputFormat as they are maintained per partition.
+        //Also we need to add hbase delegation token to the Credentials.
+        hbaseConf = conf;
     }
 
     private void checkDeleteTable(Table table) throws MetaException {
@@ -479,8 +513,6 @@ public class HBaseHCatStorageHandler ext
      */
     private void addOutputDependencyJars(Configuration conf) throws IOException {
         TableMapReduceUtil.addDependencyJars(conf,
-                //hadoop-core
-                Writable.class,
                 //ZK
                 ZooKeeper.class,
                 //HBase
@@ -489,6 +521,8 @@ public class HBaseHCatStorageHandler ext
                 HiveException.class,
                 //HCatalog jar
                 HCatOutputFormat.class,
+                //hcat hbase storage handler jar
+                HBaseHCatStorageHandler.class,
                 //hive hbase storage handler jar
                 HBaseSerDe.class,
                 //hive jar
@@ -498,18 +532,9 @@ public class HBaseHCatStorageHandler ext
                 //hbase jar
                 Bytes.class,
                 //thrift-fb303 .jar
-                FacebookBase.class);
-    }
-
-    /**
-     * Utility method to get a new Configuration with hbase-default.xml and hbase-site.xml added
-     * @param jobConf existing configuration
-     * @return a new Configuration with hbase-default.xml and hbase-site.xml added
-     */
-    private Configuration addHbaseResources(Configuration jobConf) {
-        Configuration conf = new Configuration(jobConf);
-        HBaseConfiguration.addHbaseResources(conf);
-        return conf;
+                FacebookBase.class,
+                //guava jar
+                ThreadFactoryBuilder.class);
     }
 
     /**

Modified: incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java?rev=1302978&r1=1302977&r2=1302978&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java (original)
+++ incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java Tue Mar 20 16:26:49 2012
@@ -105,7 +105,6 @@ class HBaseInputFormat implements InputF
     public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job, int numSplits)
             throws IOException {
         inputFormat.setConf(job);
-        HBaseUtil.addHBaseDelegationToken(job);
         return convertSplits(inputFormat.getSplits(HCatMapRedUtil.createJobContext(job, null,
                 Reporter.NULL)));
     }

Modified: incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java?rev=1302978&r1=1302977&r2=1302978&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java (original)
+++ incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java Tue Mar 20 16:26:49 2012
@@ -80,19 +80,16 @@ class HbaseSnapshotRecordReader implemen
 
     public void restart(byte[] firstRow) throws IOException {
         allAbortedTransactions = getAbortedTransactions(Bytes.toString(htable.getTableName()), scan);
-        long maxValidRevision = snapshot.getLatestRevision();
+        long maxValidRevision = getMaximumRevision(scan, snapshot);
         while (allAbortedTransactions.contains(maxValidRevision)) {
             maxValidRevision--;
         }
-        long minValidRevision = getMinimumRevision(scan, snapshot);
-        while (allAbortedTransactions.contains(minValidRevision)) {
-            minValidRevision--;
-        }
         Scan newScan = new Scan(scan);
         newScan.setStartRow(firstRow);
         //TODO: See if filters in 0.92 can be used to optimize the scan
         //TODO: Consider create a custom snapshot filter
-        newScan.setTimeRange(minValidRevision, maxValidRevision + 1);
+        //TODO: Make min revision a constant in RM
+        newScan.setTimeRange(0, maxValidRevision + 1);
         newScan.setMaxVersions();
         this.scanner = this.htable.getScanner(newScan);
         resultItr = this.scanner.iterator();
@@ -120,16 +117,16 @@ class HbaseSnapshotRecordReader implemen
         }
     }
 
-    private long getMinimumRevision(Scan scan, TableSnapshot snapshot) {
-        long minRevision = snapshot.getLatestRevision();
+    private long getMaximumRevision(Scan scan, TableSnapshot snapshot) {
+        long maxRevision = 0;
         byte[][] families = scan.getFamilies();
         for (byte[] familyKey : families) {
             String family = Bytes.toString(familyKey);
             long revision = snapshot.getRevision(family);
-            if (revision < minRevision)
-                minRevision = revision;
+            if (revision > maxRevision)
+                maxRevision = revision;
         }
-        return minRevision;
+        return maxRevision;
     }
 
     /*

Modified: incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java?rev=1302978&r1=1302977&r2=1302978&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java (original)
+++ incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java Tue Mar 20 16:26:49 2012
@@ -54,6 +54,7 @@ import org.apache.hcatalog.common.HCatUt
 import org.apache.hcatalog.data.DefaultHCatRecord;
 import org.apache.hcatalog.data.HCatRecord;
 import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.hbase.HBaseBulkOutputFormat.HBaseBulkOutputCommitter;
 import org.apache.hcatalog.hbase.TestHBaseDirectOutputFormat.MapReadAbortedTransaction;
 import org.apache.hcatalog.hbase.TestHBaseDirectOutputFormat.MapWriteAbortTransaction;
 import org.apache.hcatalog.hbase.snapshot.FamilyRevision;
@@ -203,6 +204,7 @@ public class TestHBaseBulkOutputFormat e
 
         job.setOutputFormat(HBaseBulkOutputFormat.class);
         org.apache.hadoop.mapred.SequenceFileOutputFormat.setOutputPath(job, interPath);
+        job.setOutputCommitter(HBaseBulkOutputCommitter.class);
 
         //manually create transaction
         RevisionManager rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf);

Modified: incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java?rev=1302978&r1=1302977&r2=1302978&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java (original)
+++ incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java Tue Mar 20 16:26:49 2012
@@ -138,6 +138,7 @@ public class TestHBaseDirectOutputFormat
         org.apache.hadoop.mapred.TextInputFormat.setInputPaths(job, inputPath);
 
         job.setOutputFormat(HBaseDirectOutputFormat.class);
+        job.set(TableOutputFormat.OUTPUT_TABLE, tableName);
         job.set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, tableName);
 
         //manually create transaction
@@ -378,7 +379,17 @@ public class TestHBaseDirectOutputFormat
         TextInputFormat.setInputPaths(job, inputPath);
         job.setOutputFormatClass(HCatOutputFormat.class);
         HCatOutputFormat.setOutput(job, outputJobInfo);
-
+        String txnString = job.getConfiguration().get(HBaseConstants.PROPERTY_WRITE_TXN_KEY);
+        //Test passing in same jobConf or same OutputJobInfo multiple times and verify 1 transaction is created
+        //Same jobConf
+        HCatOutputFormat.setOutput(job, outputJobInfo);
+        assertEquals(txnString, job.getConfiguration().get(HBaseConstants.PROPERTY_WRITE_TXN_KEY));
+        String jobString = job.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
+        //Same OutputJobInfo
+        outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(jobString);
+        Job job2 = new Job(conf);
+        HCatOutputFormat.setOutput(job2, outputJobInfo);
+        assertEquals(txnString, job2.getConfiguration().get(HBaseConstants.PROPERTY_WRITE_TXN_KEY));
         job.setMapOutputKeyClass(BytesWritable.class);
         job.setMapOutputValueClass(HCatRecord.class);
         job.setOutputKeyClass(BytesWritable.class);

Modified: incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java?rev=1302978&r1=1302977&r2=1302978&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java (original)
+++ incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java Tue Mar 20 16:26:49 2012
@@ -449,11 +449,12 @@ public class TestHBaseInputFormat extend
         assertTrue(doesTableExist);
 
         populateHBaseTable(tableName, 2);
-        populateHBaseTableQualifier1(tableName, 3, null); //Running transaction
-        populateHBaseTableQualifier1(tableName, 4, Boolean.FALSE);  //Aborted transaction
-        populateHBaseTableQualifier1(tableName, 5, Boolean.TRUE); //Committed transaction
-        populateHBaseTableQualifier1(tableName, 6, null); //Running Transaction
-        populateHBaseTableQualifier1(tableName, 7, Boolean.FALSE); //Aborted Transaction
+        populateHBaseTableQualifier1(tableName, 3, Boolean.TRUE); //Committed transaction
+        populateHBaseTableQualifier1(tableName, 4, null); //Running transaction
+        populateHBaseTableQualifier1(tableName, 5, Boolean.FALSE);  //Aborted transaction
+        populateHBaseTableQualifier1(tableName, 6, Boolean.TRUE); //Committed transaction
+        populateHBaseTableQualifier1(tableName, 7, null); //Running Transaction
+        populateHBaseTableQualifier1(tableName, 8, Boolean.FALSE); //Aborted Transaction
 
         Configuration conf = new Configuration(hcatConf);
         conf.set(HCatConstants.HCAT_KEY_HIVE_CONF,
@@ -588,7 +589,7 @@ public class TestHBaseInputFormat extend
             System.out.println("HCat record value" + value.toString());
             boolean correctValues = (value.size() == 3)
                     && (value.get(0).toString()).equalsIgnoreCase("testRow")
-                    && (value.get(1).toString()).equalsIgnoreCase("textValue-2")
+                    && (value.get(1).toString()).equalsIgnoreCase("textValue-3")
                     && (value.get(2).toString()).equalsIgnoreCase("textValue-2");
 
             if (correctValues == false) {