You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by ga...@apache.org on 2012/03/20 16:26:50 UTC
svn commit: r1302978 - in /incubator/hcatalog/branches/branch-0.4: ./
src/java/org/apache/hcatalog/pig/
storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/
storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/
Author: gates
Date: Tue Mar 20 16:26:49 2012
New Revision: 1302978
URL: http://svn.apache.org/viewvc?rev=1302978&view=rev
Log:
HCATALOG-302 unable to write to hbase channel. HBaseHCatStorageHandler class not found
Modified:
incubator/hcatalog/branches/branch-0.4/CHANGES.txt
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatStorer.java
incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java
incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java
incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java
incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java
incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java
incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java
incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java
incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java
Modified: incubator/hcatalog/branches/branch-0.4/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/CHANGES.txt?rev=1302978&r1=1302977&r2=1302978&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/CHANGES.txt (original)
+++ incubator/hcatalog/branches/branch-0.4/CHANGES.txt Tue Mar 20 16:26:49 2012
@@ -30,6 +30,8 @@ Release 0.4.1 - Unreleased
BUG FIXES
HCAT-319 Cleanup of 0.3 mapred classes (khorgath via gates)
+ HCAT-302 unable to write to hbase channel. HBaseHCatStorageHandler class not found (rohini via gates)
+
Release 0.4.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatStorer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatStorer.java?rev=1302978&r1=1302977&r2=1302978&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatStorer.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatStorer.java Tue Mar 20 16:26:49 2012
@@ -70,27 +70,30 @@ public class HCatStorer extends HCatBase
@Override
public void setStoreLocation(String location, Job job) throws IOException {
-
job.getConfiguration().set(INNER_SIGNATURE, INNER_SIGNATURE_PREFIX + "_" + sign);
Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign});
String[] userStr = location.split("\\.");
OutputJobInfo outputJobInfo;
- if(userStr.length == 2) {
- outputJobInfo = OutputJobInfo.create(userStr[0],
+ String outInfoString = p.getProperty(HCatConstants.HCAT_KEY_OUTPUT_INFO);
+ if (outInfoString != null) {
+ outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(outInfoString);
+ } else {
+ if(userStr.length == 2) {
+ outputJobInfo = OutputJobInfo.create(userStr[0],
userStr[1],
partitions);
- } else if(userStr.length == 1) {
- outputJobInfo = OutputJobInfo.create(null,
+ } else if(userStr.length == 1) {
+ outputJobInfo = OutputJobInfo.create(null,
userStr[0],
partitions);
- } else {
- throw new FrontendException("location "+location+" is invalid. It must be of the form [db.]table", PigHCatUtil.PIG_EXCEPTION_CODE);
+ } else {
+ throw new FrontendException("location "+location+" is invalid. It must be of the form [db.]table", PigHCatUtil.PIG_EXCEPTION_CODE);
+ }
}
-
Configuration config = job.getConfiguration();
if(!HCatUtil.checkJobContextIfRunningFromBackend(job)){
@@ -123,6 +126,7 @@ public class HCatStorer extends HCatBase
PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_TOKEN_SIGNATURE);
PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE);
PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_STRFORM);
+ PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_OUTPUT_INFO);
p.setProperty(COMPUTED_OUTPUT_SCHEMA,ObjectSerializer.serialize(computedSchema));
Modified: incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java?rev=1302978&r1=1302977&r2=1302978&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java (original)
+++ incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java Tue Mar 20 16:26:49 2012
@@ -61,9 +61,6 @@ class HBaseBulkOutputFormat extends HBas
@Override
public void checkOutputSpecs(FileSystem ignored, JobConf job)
throws IOException {
- job.setOutputKeyClass(ImmutableBytesWritable.class);
- job.setOutputValueClass(Put.class);
- job.setOutputCommitter(HBaseBulkOutputCommitter.class);
baseOutputFormat.checkOutputSpecs(ignored, job);
HBaseUtil.addHBaseDelegationToken(job);
addJTDelegationToken(job);
@@ -73,6 +70,8 @@ class HBaseBulkOutputFormat extends HBas
public RecordWriter<WritableComparable<?>, Put> getRecordWriter(
FileSystem ignored, JobConf job, String name, Progressable progress)
throws IOException {
+ job.setOutputKeyClass(ImmutableBytesWritable.class);
+ job.setOutputValueClass(Put.class);
long version = HBaseRevisionManagerUtil.getOutputRevision(job);
return new HBaseBulkRecordWriter(baseOutputFormat.getRecordWriter(
ignored, job, name, progress), version);
@@ -188,11 +187,21 @@ class HBaseBulkOutputFormat extends HBas
try {
Configuration conf = jobContext.getConfiguration();
Path srcPath = FileOutputFormat.getOutputPath(jobContext.getJobConf());
+ if (!FileSystem.get(conf).exists(srcPath)) {
+ throw new IOException("Failed to bulk import hfiles. " +
+ "Intermediate data directory is cleaned up or missing. " +
+ "Please look at the bulk import job if it exists for failure reason");
+ }
Path destPath = new Path(srcPath.getParent(), srcPath.getName() + "_hfiles");
- ImportSequenceFile.runJob(jobContext,
+ boolean success = ImportSequenceFile.runJob(jobContext,
conf.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY),
srcPath,
destPath);
+ if(!success) {
+ cleanIntermediate(jobContext);
+ throw new IOException("Failed to bulk import hfiles." +
+ " Please look at the bulk import job for failure reason");
+ }
rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf);
rm.commitWriteTransaction(HBaseRevisionManagerUtil.getWriteTransaction(conf));
cleanIntermediate(jobContext);
Modified: incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java?rev=1302978&r1=1302977&r2=1302978&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java (original)
+++ incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java Tue Mar 20 16:26:49 2012
@@ -63,9 +63,6 @@ class HBaseDirectOutputFormat extends HB
@Override
public void checkOutputSpecs(FileSystem ignored, JobConf job)
throws IOException {
- job.setOutputCommitter(HBaseDirectOutputCommitter.class);
- job.setIfUnset(TableOutputFormat.OUTPUT_TABLE,
- job.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY));
outputFormat.checkOutputSpecs(ignored, job);
HBaseUtil.addHBaseDelegationToken(job);
}
Modified: incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java?rev=1302978&r1=1302977&r2=1302978&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java (original)
+++ incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java Tue Mar 20 16:26:49 2012
@@ -19,7 +19,6 @@
package org.apache.hcatalog.hbase;
import java.io.IOException;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -27,6 +26,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.MasterNot
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.mapred.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
@@ -48,14 +49,15 @@ import org.apache.hadoop.hive.ql.metadat
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
import org.apache.hadoop.hive.serde2.SerDe;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.hcatalog.common.HCatConstants;
import org.apache.hcatalog.common.HCatUtil;
import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.hbase.HBaseBulkOutputFormat.HBaseBulkOutputCommitter;
+import org.apache.hcatalog.hbase.HBaseDirectOutputFormat.HBaseDirectOutputCommitter;
import org.apache.hcatalog.hbase.snapshot.RevisionManager;
import org.apache.hcatalog.hbase.snapshot.Transaction;
import org.apache.hcatalog.hbase.snapshot.ZKBasedRevisionManager;
@@ -68,20 +70,20 @@ import org.apache.thrift.TBase;
import org.apache.zookeeper.ZooKeeper;
import com.facebook.fb303.FacebookBase;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* This class HBaseHCatStorageHandler provides functionality to create HBase
* tables through HCatalog. The implementation is very similar to the
* HiveHBaseStorageHandler, with more details to suit HCatalog.
*/
-//TODO remove serializable when HCATALOG-282 is fixed
-public class HBaseHCatStorageHandler extends HCatStorageHandler implements HiveMetaHook, Serializable {
+public class HBaseHCatStorageHandler extends HCatStorageHandler implements HiveMetaHook, Configurable {
public final static String DEFAULT_PREFIX = "default.";
private final static String PROPERTY_INT_OUTPUT_LOCATION = "hcat.hbase.mapreduce.intermediateOutputLocation";
- private transient Configuration hbaseConf;
- private transient HBaseAdmin admin;
+ private Configuration hbaseConf;
+ private HBaseAdmin admin;
@Override
public void configureInputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
@@ -96,20 +98,32 @@ public class HBaseHCatStorageHandler ext
jobProperties.put(TableInputFormat.INPUT_TABLE, qualifiedTableName);
Configuration jobConf = getConf();
+ addHbaseResources(jobConf, jobProperties);
+ Configuration copyOfConf = new Configuration(jobConf);
+ HBaseConfiguration.addHbaseResources(copyOfConf);
+ //Getting hbase delegation token in getInputSplits does not work with PIG. So need to
+ //do it here
+ if (jobConf instanceof JobConf) {
+ HBaseUtil.addHBaseDelegationToken((JobConf)jobConf);
+ }
+
String outputSchema = jobConf.get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA);
jobProperties.put(TableInputFormat.SCAN_COLUMNS, getScanColumns(tableInfo, outputSchema));
String serSnapshot = (String) inputJobInfo.getProperties().get(
HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY);
if (serSnapshot == null) {
- Configuration conf = addHbaseResources(jobConf);
- HCatTableSnapshot snapshot = HBaseRevisionManagerUtil.createSnapshot(conf,
+ HCatTableSnapshot snapshot = HBaseRevisionManagerUtil.createSnapshot(copyOfConf,
qualifiedTableName, tableInfo);
jobProperties.put(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY,
HCatUtil.serialize(snapshot));
}
- addHbaseResources(jobConf, jobProperties);
+ //This adds it directly to the jobConf. Setting in jobProperties does not get propagated
+ //to JobConf as of now as the jobProperties is maintained per partition
+ //TODO: Remove when HCAT-308 is fixed
+ addOutputDependencyJars(jobConf);
+ jobProperties.put("tmpjars", jobConf.get("tmpjars"));
} catch (IOException e) {
throw new IllegalStateException("Error while configuring job properties", e);
@@ -128,33 +142,50 @@ public class HBaseHCatStorageHandler ext
HCatTableInfo tableInfo = outputJobInfo.getTableInfo();
String qualifiedTableName = HBaseHCatStorageHandler.getFullyQualifiedName(tableInfo);
jobProperties.put(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, qualifiedTableName);
+ jobProperties.put(TableOutputFormat.OUTPUT_TABLE, qualifiedTableName);
Configuration jobConf = getConf();
+ addHbaseResources(jobConf, jobProperties);
+
+ Configuration copyOfConf = new Configuration(jobConf);
+ HBaseConfiguration.addHbaseResources(copyOfConf);
+
String txnString = outputJobInfo.getProperties().getProperty(
HBaseConstants.PROPERTY_WRITE_TXN_KEY);
- if (txnString == null) {
- Configuration conf = addHbaseResources(jobConf);
- Transaction txn = HBaseRevisionManagerUtil.beginWriteTransaction(qualifiedTableName, tableInfo, conf);
+ String jobTxnString = jobConf.get(HBaseConstants.PROPERTY_WRITE_TXN_KEY);
+ //Pig makes 3 calls to HCatOutputFormat.setOutput(HCatStorer) with different JobConf
+ //which leads to creating 2 transactions.
+ //So apart from fixing HCatStorer to pass same OutputJobInfo, making the call idempotent for other
+ //cases which might call multiple times but with same JobConf.
+ Transaction txn = null;
+ if (txnString == null && jobTxnString == null) {
+ txn = HBaseRevisionManagerUtil.beginWriteTransaction(qualifiedTableName, tableInfo, copyOfConf);
+ String serializedTxn = HCatUtil.serialize(txn);
outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY,
- HCatUtil.serialize(txn));
-
- if (isBulkMode(outputJobInfo) && !(outputJobInfo.getProperties()
- .containsKey(PROPERTY_INT_OUTPUT_LOCATION))) {
- String tableLocation = tableInfo.getTableLocation();
- String location = new Path(tableLocation, "REVISION_" + txn.getRevisionNumber())
- .toString();
- outputJobInfo.getProperties().setProperty(PROPERTY_INT_OUTPUT_LOCATION,
- location);
- // We are writing out an intermediate sequenceFile hence
- // location is not passed in OutputJobInfo.getLocation()
- // TODO replace this with a mapreduce constant when available
- jobProperties.put("mapred.output.dir", location);
- }
+ serializedTxn);
+ jobProperties.put(HBaseConstants.PROPERTY_WRITE_TXN_KEY, serializedTxn);
+ } else {
+ txnString = (txnString == null) ? jobTxnString : txnString;
+ txn = (Transaction) HCatUtil.deserialize(txnString);
+ outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY,
+ txnString);
+ jobProperties.put(HBaseConstants.PROPERTY_WRITE_TXN_KEY, txnString);
+ }
+ if (isBulkMode(outputJobInfo)) {
+ String tableLocation = tableInfo.getTableLocation();
+ String location = new Path(tableLocation, "REVISION_" + txn.getRevisionNumber())
+ .toString();
+ outputJobInfo.getProperties().setProperty(PROPERTY_INT_OUTPUT_LOCATION, location);
+ // We are writing out an intermediate sequenceFile hence
+ // location is not passed in OutputJobInfo.getLocation()
+ // TODO replace this with a mapreduce constant when available
+ jobProperties.put("mapred.output.dir", location);
+ jobProperties.put("mapred.output.committer.class", HBaseBulkOutputCommitter.class.getName());
+ } else {
+ jobProperties.put("mapred.output.committer.class", HBaseDirectOutputCommitter.class.getName());
}
- jobProperties
- .put(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo));
- addHbaseResources(jobConf, jobProperties);
+ jobProperties.put(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo));
addOutputDependencyJars(jobConf);
jobProperties.put("tmpjars", jobConf.get("tmpjars"));
@@ -429,7 +460,10 @@ public class HBaseHCatStorageHandler ext
@Override
public void setConf(Configuration conf) {
- hbaseConf = HBaseConfiguration.create(conf);
+ //Not cloning as we want to set tmpjars on it. Putting in jobProperties does not
+ //get propagated to JobConf in case of InputFormat as they are maintained per partition.
+ //Also we need to add hbase delegation token to the Credentials.
+ hbaseConf = conf;
}
private void checkDeleteTable(Table table) throws MetaException {
@@ -479,8 +513,6 @@ public class HBaseHCatStorageHandler ext
*/
private void addOutputDependencyJars(Configuration conf) throws IOException {
TableMapReduceUtil.addDependencyJars(conf,
- //hadoop-core
- Writable.class,
//ZK
ZooKeeper.class,
//HBase
@@ -489,6 +521,8 @@ public class HBaseHCatStorageHandler ext
HiveException.class,
//HCatalog jar
HCatOutputFormat.class,
+ //hcat hbase storage handler jar
+ HBaseHCatStorageHandler.class,
//hive hbase storage handler jar
HBaseSerDe.class,
//hive jar
@@ -498,18 +532,9 @@ public class HBaseHCatStorageHandler ext
//hbase jar
Bytes.class,
//thrift-fb303 .jar
- FacebookBase.class);
- }
-
- /**
- * Utility method to get a new Configuration with hbase-default.xml and hbase-site.xml added
- * @param jobConf existing configuration
- * @return a new Configuration with hbase-default.xml and hbase-site.xml added
- */
- private Configuration addHbaseResources(Configuration jobConf) {
- Configuration conf = new Configuration(jobConf);
- HBaseConfiguration.addHbaseResources(conf);
- return conf;
+ FacebookBase.class,
+ //guava jar
+ ThreadFactoryBuilder.class);
}
/**
Modified: incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java?rev=1302978&r1=1302977&r2=1302978&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java (original)
+++ incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java Tue Mar 20 16:26:49 2012
@@ -105,7 +105,6 @@ class HBaseInputFormat implements InputF
public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
inputFormat.setConf(job);
- HBaseUtil.addHBaseDelegationToken(job);
return convertSplits(inputFormat.getSplits(HCatMapRedUtil.createJobContext(job, null,
Reporter.NULL)));
}
Modified: incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java?rev=1302978&r1=1302977&r2=1302978&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java (original)
+++ incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java Tue Mar 20 16:26:49 2012
@@ -80,19 +80,16 @@ class HbaseSnapshotRecordReader implemen
public void restart(byte[] firstRow) throws IOException {
allAbortedTransactions = getAbortedTransactions(Bytes.toString(htable.getTableName()), scan);
- long maxValidRevision = snapshot.getLatestRevision();
+ long maxValidRevision = getMaximumRevision(scan, snapshot);
while (allAbortedTransactions.contains(maxValidRevision)) {
maxValidRevision--;
}
- long minValidRevision = getMinimumRevision(scan, snapshot);
- while (allAbortedTransactions.contains(minValidRevision)) {
- minValidRevision--;
- }
Scan newScan = new Scan(scan);
newScan.setStartRow(firstRow);
//TODO: See if filters in 0.92 can be used to optimize the scan
//TODO: Consider create a custom snapshot filter
- newScan.setTimeRange(minValidRevision, maxValidRevision + 1);
+ //TODO: Make min revision a constant in RM
+ newScan.setTimeRange(0, maxValidRevision + 1);
newScan.setMaxVersions();
this.scanner = this.htable.getScanner(newScan);
resultItr = this.scanner.iterator();
@@ -120,16 +117,16 @@ class HbaseSnapshotRecordReader implemen
}
}
- private long getMinimumRevision(Scan scan, TableSnapshot snapshot) {
- long minRevision = snapshot.getLatestRevision();
+ private long getMaximumRevision(Scan scan, TableSnapshot snapshot) {
+ long maxRevision = 0;
byte[][] families = scan.getFamilies();
for (byte[] familyKey : families) {
String family = Bytes.toString(familyKey);
long revision = snapshot.getRevision(family);
- if (revision < minRevision)
- minRevision = revision;
+ if (revision > maxRevision)
+ maxRevision = revision;
}
- return minRevision;
+ return maxRevision;
}
/*
Modified: incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java?rev=1302978&r1=1302977&r2=1302978&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java (original)
+++ incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java Tue Mar 20 16:26:49 2012
@@ -54,6 +54,7 @@ import org.apache.hcatalog.common.HCatUt
import org.apache.hcatalog.data.DefaultHCatRecord;
import org.apache.hcatalog.data.HCatRecord;
import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.hbase.HBaseBulkOutputFormat.HBaseBulkOutputCommitter;
import org.apache.hcatalog.hbase.TestHBaseDirectOutputFormat.MapReadAbortedTransaction;
import org.apache.hcatalog.hbase.TestHBaseDirectOutputFormat.MapWriteAbortTransaction;
import org.apache.hcatalog.hbase.snapshot.FamilyRevision;
@@ -203,6 +204,7 @@ public class TestHBaseBulkOutputFormat e
job.setOutputFormat(HBaseBulkOutputFormat.class);
org.apache.hadoop.mapred.SequenceFileOutputFormat.setOutputPath(job, interPath);
+ job.setOutputCommitter(HBaseBulkOutputCommitter.class);
//manually create transaction
RevisionManager rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf);
Modified: incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java?rev=1302978&r1=1302977&r2=1302978&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java (original)
+++ incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java Tue Mar 20 16:26:49 2012
@@ -138,6 +138,7 @@ public class TestHBaseDirectOutputFormat
org.apache.hadoop.mapred.TextInputFormat.setInputPaths(job, inputPath);
job.setOutputFormat(HBaseDirectOutputFormat.class);
+ job.set(TableOutputFormat.OUTPUT_TABLE, tableName);
job.set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, tableName);
//manually create transaction
@@ -378,7 +379,17 @@ public class TestHBaseDirectOutputFormat
TextInputFormat.setInputPaths(job, inputPath);
job.setOutputFormatClass(HCatOutputFormat.class);
HCatOutputFormat.setOutput(job, outputJobInfo);
-
+ String txnString = job.getConfiguration().get(HBaseConstants.PROPERTY_WRITE_TXN_KEY);
+ //Test passing in same jobConf or same OutputJobInfo multiple times and verify 1 transaction is created
+ //Same jobConf
+ HCatOutputFormat.setOutput(job, outputJobInfo);
+ assertEquals(txnString, job.getConfiguration().get(HBaseConstants.PROPERTY_WRITE_TXN_KEY));
+ String jobString = job.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
+ //Same OutputJobInfo
+ outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(jobString);
+ Job job2 = new Job(conf);
+ HCatOutputFormat.setOutput(job2, outputJobInfo);
+ assertEquals(txnString, job2.getConfiguration().get(HBaseConstants.PROPERTY_WRITE_TXN_KEY));
job.setMapOutputKeyClass(BytesWritable.class);
job.setMapOutputValueClass(HCatRecord.class);
job.setOutputKeyClass(BytesWritable.class);
Modified: incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java?rev=1302978&r1=1302977&r2=1302978&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java (original)
+++ incubator/hcatalog/branches/branch-0.4/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java Tue Mar 20 16:26:49 2012
@@ -449,11 +449,12 @@ public class TestHBaseInputFormat extend
assertTrue(doesTableExist);
populateHBaseTable(tableName, 2);
- populateHBaseTableQualifier1(tableName, 3, null); //Running transaction
- populateHBaseTableQualifier1(tableName, 4, Boolean.FALSE); //Aborted transaction
- populateHBaseTableQualifier1(tableName, 5, Boolean.TRUE); //Committed transaction
- populateHBaseTableQualifier1(tableName, 6, null); //Running Transaction
- populateHBaseTableQualifier1(tableName, 7, Boolean.FALSE); //Aborted Transaction
+ populateHBaseTableQualifier1(tableName, 3, Boolean.TRUE); //Committed transaction
+ populateHBaseTableQualifier1(tableName, 4, null); //Running transaction
+ populateHBaseTableQualifier1(tableName, 5, Boolean.FALSE); //Aborted transaction
+ populateHBaseTableQualifier1(tableName, 6, Boolean.TRUE); //Committed transaction
+ populateHBaseTableQualifier1(tableName, 7, null); //Running Transaction
+ populateHBaseTableQualifier1(tableName, 8, Boolean.FALSE); //Aborted Transaction
Configuration conf = new Configuration(hcatConf);
conf.set(HCatConstants.HCAT_KEY_HIVE_CONF,
@@ -588,7 +589,7 @@ public class TestHBaseInputFormat extend
System.out.println("HCat record value" + value.toString());
boolean correctValues = (value.size() == 3)
&& (value.get(0).toString()).equalsIgnoreCase("testRow")
- && (value.get(1).toString()).equalsIgnoreCase("textValue-2")
+ && (value.get(1).toString()).equalsIgnoreCase("textValue-3")
&& (value.get(2).toString()).equalsIgnoreCase("textValue-2");
if (correctValues == false) {