You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by to...@apache.org on 2012/03/03 02:56:06 UTC
svn commit: r1296568 [2/3] - in /incubator/hcatalog/trunk: ./
src/java/org/apache/hcatalog/mapreduce/
storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/
storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/
storage-drivers/hbase/src...
Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java?rev=1296568&r1=1296567&r2=1296568&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java Sat Mar 3 02:56:05 2012
@@ -20,6 +20,7 @@ package org.apache.hcatalog.hbase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -27,15 +28,20 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.TableRecordReader;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.mapred.RecordReader;
import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.hbase.snapshot.FamilyRevision;
+import org.apache.hcatalog.hbase.snapshot.RevisionManager;
import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
import org.apache.hcatalog.mapreduce.InputJobInfo;
@@ -43,59 +49,93 @@ import org.apache.hcatalog.mapreduce.Inp
* The Class HbaseSnapshotRecordReader implements logic for filtering records
* based on snapshot.
*/
-class HbaseSnapshotRecordReader extends TableRecordReader {
+class HbaseSnapshotRecordReader implements RecordReader<ImmutableBytesWritable, Result> {
static final Log LOG = LogFactory.getLog(HbaseSnapshotRecordReader.class);
+ private final InputJobInfo inpJobInfo;
+ private final Configuration conf;
+ private final int maxRevisions = 1;
private ResultScanner scanner;
private Scan scan;
private HTable htable;
- private ImmutableBytesWritable key;
- private Result value;
- private InputJobInfo inpJobInfo;
private TableSnapshot snapshot;
- private int maxRevisions;
private Iterator<Result> resultItr;
+ private Set<Long> allAbortedTransactions;
+ private DataOutputBuffer valueOut = new DataOutputBuffer();
+ private DataInputBuffer valueIn = new DataInputBuffer();
-
- HbaseSnapshotRecordReader(InputJobInfo inputJobInfo) throws IOException {
+ HbaseSnapshotRecordReader(InputJobInfo inputJobInfo, Configuration conf) throws IOException {
this.inpJobInfo = inputJobInfo;
- String snapshotString = inpJobInfo.getProperties().getProperty(
- HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY);
+ this.conf = conf;
+ String snapshotString = conf.get(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY);
HCatTableSnapshot hcatSnapshot = (HCatTableSnapshot) HCatUtil
.deserialize(snapshotString);
- this.snapshot = HBaseInputStorageDriver.convertSnapshot(hcatSnapshot,
+ this.snapshot = HBaseRevisionManagerUtil.convertSnapshot(hcatSnapshot,
inpJobInfo.getTableInfo());
- this.maxRevisions = 1;
}
- /* @param firstRow The first record in the split.
- /* @throws IOException
- * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#restart(byte[])
- */
- @Override
+ public void init() throws IOException {
+ restart(scan.getStartRow());
+ }
+
public void restart(byte[] firstRow) throws IOException {
+ allAbortedTransactions = getAbortedTransactions(Bytes.toString(htable.getTableName()), scan);
+ long maxValidRevision = snapshot.getLatestRevision();
+ while (allAbortedTransactions.contains(maxValidRevision)) {
+ maxValidRevision--;
+ }
+ long minValidRevision = getMinimumRevision(scan, snapshot);
+ while (allAbortedTransactions.contains(minValidRevision)) {
+ minValidRevision--;
+ }
Scan newScan = new Scan(scan);
newScan.setStartRow(firstRow);
+ //TODO: See if filters in 0.92 can be used to optimize the scan
+ //TODO: Consider create a custom snapshot filter
+ newScan.setTimeRange(minValidRevision, maxValidRevision + 1);
+ newScan.setMaxVersions();
this.scanner = this.htable.getScanner(newScan);
resultItr = this.scanner.iterator();
}
- /* @throws IOException
- * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#init()
- */
- @Override
- public void init() throws IOException {
- restart(scan.getStartRow());
+ private Set<Long> getAbortedTransactions(String tableName, Scan scan) throws IOException {
+ Set<Long> abortedTransactions = new HashSet<Long>();
+ RevisionManager rm = null;
+ try {
+ rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf);
+ byte[][] families = scan.getFamilies();
+ for (byte[] familyKey : families) {
+ String family = Bytes.toString(familyKey);
+ List<FamilyRevision> abortedWriteTransactions = rm.getAbortedWriteTransactions(
+ tableName, family);
+ if (abortedWriteTransactions != null) {
+ for (FamilyRevision revision : abortedWriteTransactions) {
+ abortedTransactions.add(revision.getRevision());
+ }
+ }
+ }
+ return abortedTransactions;
+ } finally {
+ HBaseRevisionManagerUtil.closeRevisionManagerQuietly(rm);
+ }
+ }
+
+ private long getMinimumRevision(Scan scan, TableSnapshot snapshot) {
+ long minRevision = snapshot.getLatestRevision();
+ byte[][] families = scan.getFamilies();
+ for (byte[] familyKey : families) {
+ String family = Bytes.toString(familyKey);
+ long revision = snapshot.getRevision(family);
+ if (revision < minRevision)
+ minRevision = revision;
+ }
+ return minRevision;
}
/*
* @param htable The HTable ( of HBase) to use for the record reader.
*
- * @see
- * org.apache.hadoop.hbase.mapreduce.TableRecordReader#setHTable(org.apache
- * .hadoop.hbase.client.HTable)
*/
- @Override
public void setHTable(HTable htable) {
this.htable = htable;
}
@@ -103,64 +143,51 @@ class HbaseSnapshotRecordReader extends
/*
* @param scan The scan to be used for reading records.
*
- * @see
- * org.apache.hadoop.hbase.mapreduce.TableRecordReader#setScan(org.apache
- * .hadoop.hbase.client.Scan)
*/
- @Override
public void setScan(Scan scan) {
this.scan = scan;
}
- /*
- * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#close()
- */
@Override
- public void close() {
- this.resultItr = null;
- this.scanner.close();
+ public ImmutableBytesWritable createKey() {
+ return new ImmutableBytesWritable();
}
- /* @return The row of hbase record.
- /* @throws IOException
- /* @throws InterruptedException
- * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#getCurrentKey()
- */
@Override
- public ImmutableBytesWritable getCurrentKey() throws IOException,
- InterruptedException {
- return key;
+ public Result createValue() {
+ return new Result();
}
- /* @return Single row result of scan of HBase table.
- /* @throws IOException
- /* @throws InterruptedException
- * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#getCurrentValue()
- */
@Override
- public Result getCurrentValue() throws IOException, InterruptedException {
- return value;
+ public long getPos() {
+ // This should be the ordinal tuple in the range;
+ // not clear how to calculate...
+ return 0;
}
- /* @return Returns whether a next key-value is available for reading.
- * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#nextKeyValue()
- */
@Override
- public boolean nextKeyValue() {
+ public float getProgress() throws IOException {
+ // Depends on the total number of tuples
+ return 0;
+ }
+ @Override
+ public boolean next(ImmutableBytesWritable key, Result value) throws IOException {
if (this.resultItr == null) {
LOG.warn("The HBase result iterator is found null. It is possible"
+ " that the record reader has already been closed.");
} else {
-
- if (key == null)
- key = new ImmutableBytesWritable();
while (resultItr.hasNext()) {
Result temp = resultItr.next();
Result hbaseRow = prepareResult(temp.list());
if (hbaseRow != null) {
+ // Update key and value. Currently no way to avoid serialization/de-serialization
+ // as no setters are available.
key.set(hbaseRow.getRow());
- value = hbaseRow;
+ valueOut.reset();
+ hbaseRow.write(valueOut);
+ valueIn.reset(valueOut.getData(), valueOut.getLength());
+ value.readFields(valueIn);
return true;
}
@@ -185,6 +212,11 @@ class HbaseSnapshotRecordReader extends
}
String family = Bytes.toString(kv.getFamily());
+ //Ignore aborted transactions
+ if (allAbortedTransactions.contains(kv.getTimestamp())) {
+ continue;
+ }
+
long desiredTS = snapshot.getRevision(family);
if (kv.getTimestamp() <= desiredTS) {
kvs.add(kv);
@@ -213,13 +245,13 @@ class HbaseSnapshotRecordReader extends
}
}
- /* @return The progress of the record reader.
- * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#getProgress()
+ /*
+ * @see org.apache.hadoop.hbase.mapred.TableRecordReader#close()
*/
@Override
- public float getProgress() {
- // Depends on the total number of tuples
- return 0;
+ public void close() {
+ this.resultItr = null;
+ this.scanner.close();
}
}
Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/FamilyRevision.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/FamilyRevision.java?rev=1296568&r1=1296567&r2=1296568&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/FamilyRevision.java (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/FamilyRevision.java Sat Mar 3 02:56:05 2012
@@ -25,7 +25,7 @@ package org.apache.hcatalog.hbase.snapsh
* family and stored in the corresponding znode. When a write transaction is
* committed, the transaction object is removed from the list.
*/
-class FamilyRevision implements
+public class FamilyRevision implements
Comparable<FamilyRevision> {
private long revision;
@@ -42,11 +42,11 @@ class FamilyRevision implements
this.timestamp = ts;
}
- long getRevision() {
+ public long getRevision() {
return revision;
}
- long getExpireTimestamp() {
+ public long getExpireTimestamp() {
return timestamp;
}
Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java?rev=1296568&r1=1296567&r2=1296568&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java Sat Mar 3 02:56:05 2012
@@ -89,6 +89,17 @@ public interface RevisionManager {
throws IOException;
/**
+ * Get the list of aborted Transactions for a column family
+ *
+ * @param table the table name
+ * @param columnFamily the column family name
+ * @return a list of aborted WriteTransactions
+ * @throws java.io.IOException
+ */
+ public List<FamilyRevision> getAbortedWriteTransactions(String table,
+ String columnFamily) throws IOException;
+
+ /**
* Create the latest snapshot of the table.
*
* @param tableName
Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java?rev=1296568&r1=1296567&r2=1296568&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java Sat Mar 3 02:56:05 2012
@@ -365,14 +365,8 @@ public class ZKBasedRevisionManager impl
return zkUtil.getTransactionList(path);
}
- /**
- * Get the list of aborted Transactions for a column family
- * @param table the table name
- * @param columnFamily the column family name
- * @return a list of aborted WriteTransactions
- * @throws java.io.IOException
- */
- List<FamilyRevision> getAbortedWriteTransactions(String table,
+ @Override
+ public List<FamilyRevision> getAbortedWriteTransactions(String table,
String columnFamily) throws IOException {
String path = PathUtil.getAbortInformationPath(baseDir, table, columnFamily);
return zkUtil.getTransactionList(path);
Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java?rev=1296568&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java Sat Mar 3 02:56:05 2012
@@ -0,0 +1,609 @@
+package org.apache.hcatalog.hbase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hcatalog.cli.HCatDriver;
+import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.hbase.TestHBaseDirectOutputFormat.MapReadAbortedTransaction;
+import org.apache.hcatalog.hbase.TestHBaseDirectOutputFormat.MapWriteAbortTransaction;
+import org.apache.hcatalog.hbase.snapshot.FamilyRevision;
+import org.apache.hcatalog.hbase.snapshot.RevisionManager;
+import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
+import org.apache.hcatalog.hbase.snapshot.Transaction;
+import org.apache.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests components of HBaseHCatStorageHandler using ManyMiniCluster.
+ * Including ImprtSequenceFile and HBaseBulkOutputFormat
+ */
+public class TestHBaseBulkOutputFormat extends SkeletonHBaseTest {
+ private final static Log LOG = LogFactory.getLog(TestHBaseBulkOutputFormat.class);
+
+ private final HiveConf allConf;
+ private final HCatDriver hcatDriver;
+
+ public TestHBaseBulkOutputFormat() {
+ allConf = getHiveConf();
+ allConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
+ HCatSemanticAnalyzer.class.getName());
+ allConf.set(HiveConf.ConfVars.HADOOPFS.varname, getFileSystem().getUri().toString());
+ allConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, new Path(getTestDir(),"warehouse").toString());
+
+ //Add hbase properties
+ for (Map.Entry<String, String> el : getHbaseConf())
+ allConf.set(el.getKey(), el.getValue());
+ for (Map.Entry<String, String> el : getJobConf())
+ allConf.set(el.getKey(), el.getValue());
+
+ SessionState.start(new CliSessionState(allConf));
+ hcatDriver = new HCatDriver();
+ }
+
+ public static class MapWriteOldMapper implements org.apache.hadoop.mapred.Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public void configure(JobConf job) {
+ }
+
+ @Override
+ public void map(LongWritable key, Text value,
+ OutputCollector<ImmutableBytesWritable, Put> output,
+ Reporter reporter) throws IOException {
+ String vals[] = value.toString().split(",");
+ Put put = new Put(Bytes.toBytes(vals[0]));
+ for(int i=1;i<vals.length;i++) {
+ String pair[] = vals[i].split(":");
+ put.add(Bytes.toBytes("my_family"),
+ Bytes.toBytes(pair[0]),
+ Bytes.toBytes(pair[1]));
+ }
+ output.collect(new ImmutableBytesWritable(Bytes.toBytes(vals[0])),put);
+ }
+
+ }
+
+ public static class MapWrite extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
+
+ @Override
+ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+ String vals[] = value.toString().split(",");
+ Put put = new Put(Bytes.toBytes(vals[0]));
+ for(int i=1;i<vals.length;i++) {
+ String pair[] = vals[i].split(":");
+ put.add(Bytes.toBytes("my_family"),
+ Bytes.toBytes(pair[0]),
+ Bytes.toBytes(pair[1]));
+ }
+ context.write(new ImmutableBytesWritable(Bytes.toBytes(vals[0])),put);
+ }
+ }
+
+ public static class MapHCatWrite extends Mapper<LongWritable, Text, BytesWritable, HCatRecord> {
+ @Override
+ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+ OutputJobInfo jobInfo = (OutputJobInfo)HCatUtil.deserialize(context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
+ HCatRecord record = new DefaultHCatRecord(3);
+ HCatSchema schema = jobInfo.getOutputSchema();
+ String vals[] = value.toString().split(",");
+ record.setInteger("key",schema,Integer.parseInt(vals[0]));
+ for(int i=1;i<vals.length;i++) {
+ String pair[] = vals[i].split(":");
+ record.set(pair[0],schema,pair[1]);
+ }
+ context.write(null,record);
+ }
+ }
+
+ @Test
+ public void hbaseBulkOutputFormatTest() throws IOException, ClassNotFoundException, InterruptedException {
+ String testName = "hbaseBulkOutputFormatTest";
+ Path methodTestDir = new Path(getTestDir(),testName);
+ LOG.info("starting: "+testName);
+
+ String tableName = newTableName(testName).toLowerCase();
+ String familyName = "my_family";
+ byte[] familyNameBytes = Bytes.toBytes(familyName);
+
+ //include hbase config in conf file
+ Configuration conf = new Configuration(allConf);
+
+ //create table
+ conf.set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, tableName);
+ createTable(tableName, new String[]{familyName});
+
+ String data[] = {"1,english:one,spanish:uno",
+ "2,english:two,spanish:dos",
+ "3,english:three,spanish:tres"};
+
+
+
+ // input/output settings
+ Path inputPath = new Path(methodTestDir,"mr_input");
+ FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile.txt"));
+ for(String line: data)
+ os.write(Bytes.toBytes(line + "\n"));
+ os.close();
+ Path interPath = new Path(methodTestDir,"inter");
+ //create job
+ JobConf job = new JobConf(conf);
+ job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
+ job.setJarByClass(this.getClass());
+ job.setMapperClass(MapWriteOldMapper.class);
+
+ job.setInputFormat(org.apache.hadoop.mapred.TextInputFormat.class);
+ org.apache.hadoop.mapred.TextInputFormat.setInputPaths(job, inputPath);
+
+ job.setOutputFormat(HBaseBulkOutputFormat.class);
+ org.apache.hadoop.mapred.SequenceFileOutputFormat.setOutputPath(job, interPath);
+
+ //manually create transaction
+ RevisionManager rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf);
+ try {
+ OutputJobInfo outputJobInfo = OutputJobInfo.create("default", tableName, null);
+ Transaction txn = rm.beginWriteTransaction(tableName, Arrays.asList(familyName));
+ outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY,
+ HCatUtil.serialize(txn));
+ job.set(HCatConstants.HCAT_KEY_OUTPUT_INFO,
+ HCatUtil.serialize(outputJobInfo));
+ } finally {
+ rm.close();
+ }
+
+ job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ job.setMapOutputValueClass(HCatRecord.class);
+
+ job.setOutputKeyClass(ImmutableBytesWritable.class);
+ job.setOutputValueClass(HCatRecord.class);
+
+ job.setNumReduceTasks(0);
+
+ RunningJob runJob = JobClient.runJob(job);
+ runJob.waitForCompletion();
+ assertTrue(runJob.isSuccessful());
+
+ //verify
+ HTable table = new HTable(conf, tableName);
+ Scan scan = new Scan();
+ scan.addFamily(familyNameBytes);
+ ResultScanner scanner = table.getScanner(scan);
+ int index=0;
+ for(Result result: scanner) {
+ String vals[] = data[index].toString().split(",");
+ for(int i=1;i<vals.length;i++) {
+ String pair[] = vals[i].split(":");
+ assertTrue(result.containsColumn(familyNameBytes,Bytes.toBytes(pair[0])));
+ assertEquals(pair[1],Bytes.toString(result.getValue(familyNameBytes,Bytes.toBytes(pair[0]))));
+ }
+ index++;
+ }
+ //test if load count is the same
+ assertEquals(data.length,index);
+ //test if scratch directory was erased
+ assertFalse(FileSystem.get(job).exists(interPath));
+ }
+
+ @Test
+ public void importSequenceFileTest() throws IOException, ClassNotFoundException, InterruptedException {
+ String testName = "importSequenceFileTest";
+ Path methodTestDir = new Path(getTestDir(),testName);
+ LOG.info("starting: "+testName);
+
+ String tableName = newTableName(testName).toLowerCase();
+ String familyName = "my_family";
+ byte[] familyNameBytes = Bytes.toBytes(familyName);
+
+ //include hbase config in conf file
+ Configuration conf = new Configuration(allConf);
+
+ //create table
+ createTable(tableName,new String[]{familyName});
+
+ String data[] = {"1,english:one,spanish:uno",
+ "2,english:two,spanish:dos",
+ "3,english:three,spanish:tres"};
+
+
+
+ // input/output settings
+ Path inputPath = new Path(methodTestDir,"mr_input");
+ getFileSystem().mkdirs(inputPath);
+ FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile.txt"));
+ for(String line: data)
+ os.write(Bytes.toBytes(line + "\n"));
+ os.close();
+ Path interPath = new Path(methodTestDir,"inter");
+ Path scratchPath = new Path(methodTestDir,"scratch");
+
+
+ //create job
+ Job job = new Job(conf, testName);
+ job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
+ job.setJarByClass(this.getClass());
+ job.setMapperClass(MapWrite.class);
+
+ job.setInputFormatClass(TextInputFormat.class);
+ TextInputFormat.setInputPaths(job, inputPath);
+
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ SequenceFileOutputFormat.setOutputPath(job,interPath);
+
+ job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ job.setMapOutputValueClass(Put.class);
+
+ job.setOutputKeyClass(ImmutableBytesWritable.class);
+ job.setOutputValueClass(Put.class);
+
+ job.setNumReduceTasks(0);
+ assertTrue(job.waitForCompletion(true));
+
+ job = new Job(new Configuration(allConf),testName+"_importer");
+ assertTrue(ImportSequenceFile.runJob(job, tableName, interPath, scratchPath));
+
+ //verify
+ HTable table = new HTable(conf, tableName);
+ Scan scan = new Scan();
+ scan.addFamily(familyNameBytes);
+ ResultScanner scanner = table.getScanner(scan);
+ int index=0;
+ for(Result result: scanner) {
+ String vals[] = data[index].toString().split(",");
+ for(int i=1;i<vals.length;i++) {
+ String pair[] = vals[i].split(":");
+ assertTrue(result.containsColumn(familyNameBytes,Bytes.toBytes(pair[0])));
+ assertEquals(pair[1],Bytes.toString(result.getValue(familyNameBytes,Bytes.toBytes(pair[0]))));
+ }
+ index++;
+ }
+ //test if load count is the same
+ assertEquals(data.length,index);
+ //test if scratch directory was erased
+ assertFalse(FileSystem.get(job.getConfiguration()).exists(scratchPath));
+ }
+
+ @Test
+ public void bulkModeHCatOutputFormatTest() throws Exception {
+ String testName = "bulkModeHCatOutputFormatTest";
+ Path methodTestDir = new Path(getTestDir(),testName);
+ LOG.info("starting: "+testName);
+
+ String databaseName = testName.toLowerCase();
+ String dbDir = new Path(methodTestDir,"DB_"+testName).toString();
+ String tableName = newTableName(testName).toLowerCase();
+ String familyName = "my_family";
+ byte[] familyNameBytes = Bytes.toBytes(familyName);
+
+
+ //include hbase config in conf file
+ Configuration conf = new Configuration(allConf);
+ conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(allConf.getAllProperties()));
+
+
+ String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '" + dbDir + "'";
+ String tableQuery = "CREATE TABLE " + databaseName + "." + tableName +
+ "(key int, english string, spanish string) STORED BY " +
+ "'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" +
+ "TBLPROPERTIES ('"+HBaseConstants.PROPERTY_OSD_BULK_MODE_KEY+"'='true',"+
+ "'hbase.columns.mapping'=':key,"+familyName+":english,"+familyName+":spanish')" ;
+
+ assertEquals(0, hcatDriver.run(dbquery).getResponseCode());
+ assertEquals(0, hcatDriver.run(tableQuery).getResponseCode());
+
+ String data[] = {"1,english:ONE,spanish:UNO",
+ "2,english:TWO,spanish:DOS",
+ "3,english:THREE,spanish:TRES"};
+
+ // input/output settings
+ Path inputPath = new Path(methodTestDir,"mr_input");
+ getFileSystem().mkdirs(inputPath);
+ //create multiple files so we can test with multiple mappers
+ for(int i=0;i<data.length;i++) {
+ FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile"+i+".txt"));
+ os.write(Bytes.toBytes(data[i] + "\n"));
+ os.close();
+ }
+
+ //create job
+ Job job = new Job(conf,testName);
+ job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
+ job.setJarByClass(this.getClass());
+ job.setMapperClass(MapHCatWrite.class);
+
+ job.setInputFormatClass(TextInputFormat.class);
+ TextInputFormat.setInputPaths(job, inputPath);
+
+
+ job.setOutputFormatClass(HCatOutputFormat.class);
+ OutputJobInfo outputJobInfo = OutputJobInfo.create(databaseName,tableName,null);
+ HCatOutputFormat.setOutput(job,outputJobInfo);
+
+ job.setMapOutputKeyClass(BytesWritable.class);
+ job.setMapOutputValueClass(HCatRecord.class);
+
+ job.setOutputKeyClass(BytesWritable.class);
+ job.setOutputValueClass(HCatRecord.class);
+
+ job.setNumReduceTasks(0);
+
+ assertTrue(job.waitForCompletion(true));
+ RevisionManager rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf);
+ try {
+ TableSnapshot snapshot = rm.createSnapshot(databaseName+"."+tableName);
+ for(String el: snapshot.getColumnFamilies()) {
+ assertEquals(1,snapshot.getRevision(el));
+ }
+ } finally {
+ rm.close();
+ }
+
+ //verify
+ HTable table = new HTable(conf, databaseName+"."+tableName);
+ Scan scan = new Scan();
+ scan.addFamily(familyNameBytes);
+ ResultScanner scanner = table.getScanner(scan);
+ int index=0;
+ for(Result result: scanner) {
+ String vals[] = data[index].toString().split(",");
+ for(int i=1;i<vals.length;i++) {
+ String pair[] = vals[i].split(":");
+ assertTrue(result.containsColumn(familyNameBytes,Bytes.toBytes(pair[0])));
+ assertEquals(pair[1],Bytes.toString(result.getValue(familyNameBytes,Bytes.toBytes(pair[0]))));
+ assertEquals(1l,result.getColumn(familyNameBytes,Bytes.toBytes(pair[0])).get(0).getTimestamp());
+ }
+ index++;
+ }
+ //test if load count is the same
+ assertEquals(data.length,index);
+ }
+
+ @Test
+ public void bulkModeHCatOutputFormatTestWithDefaultDB() throws Exception {
+ String testName = "bulkModeHCatOutputFormatTestWithDefaultDB";
+ Path methodTestDir = new Path(getTestDir(),testName);
+
+ String databaseName = "default";
+ String dbDir = new Path(methodTestDir,"DB_"+testName).toString();
+ String tableName = newTableName(testName).toLowerCase();
+ String familyName = "my_family";
+ byte[] familyNameBytes = Bytes.toBytes(familyName);
+
+
+ //include hbase config in conf file
+ Configuration conf = new Configuration(allConf);
+ conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(allConf.getAllProperties()));
+
+
+ String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '" + dbDir + "'";
+ String tableQuery = "CREATE TABLE " + databaseName + "." + tableName +
+ "(key int, english string, spanish string) STORED BY " +
+ "'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" +
+ "TBLPROPERTIES ('"+HBaseConstants.PROPERTY_OSD_BULK_MODE_KEY+"'='true',"+
+ "'hbase.columns.mapping'=':key,"+familyName+":english,"+familyName+":spanish')" ;
+
+ assertEquals(0, hcatDriver.run(dbquery).getResponseCode());
+ assertEquals(0, hcatDriver.run(tableQuery).getResponseCode());
+
+ String data[] = {"1,english:ONE,spanish:UNO",
+ "2,english:TWO,spanish:DOS",
+ "3,english:THREE,spanish:TRES"};
+
+ // input/output settings
+ Path inputPath = new Path(methodTestDir,"mr_input");
+ getFileSystem().mkdirs(inputPath);
+ FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile.txt"));
+ for(String line: data)
+ os.write(Bytes.toBytes(line + "\n"));
+ os.close();
+
+ //create job
+ Job job = new Job(conf,testName);
+ job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
+ job.setJarByClass(this.getClass());
+ job.setMapperClass(MapHCatWrite.class);
+
+ job.setInputFormatClass(TextInputFormat.class);
+ TextInputFormat.setInputPaths(job, inputPath);
+
+
+ job.setOutputFormatClass(HCatOutputFormat.class);
+ OutputJobInfo outputJobInfo = OutputJobInfo.create(databaseName,tableName,null);
+ HCatOutputFormat.setOutput(job,outputJobInfo);
+
+ job.setMapOutputKeyClass(BytesWritable.class);
+ job.setMapOutputValueClass(HCatRecord.class);
+
+ job.setOutputKeyClass(BytesWritable.class);
+ job.setOutputValueClass(HCatRecord.class);
+
+ job.setNumReduceTasks(0);
+
+ assertTrue(job.waitForCompletion(true));
+
+ //verify
+ HTable table = new HTable(conf, tableName);
+ Scan scan = new Scan();
+ scan.addFamily(familyNameBytes);
+ ResultScanner scanner = table.getScanner(scan);
+ int index=0;
+ for(Result result: scanner) {
+ String vals[] = data[index].toString().split(",");
+ for(int i=1;i<vals.length;i++) {
+ String pair[] = vals[i].split(":");
+ assertTrue(result.containsColumn(familyNameBytes,Bytes.toBytes(pair[0])));
+ assertEquals(pair[1],Bytes.toString(result.getValue(familyNameBytes,Bytes.toBytes(pair[0]))));
+ }
+ index++;
+ }
+ //test if load count is the same
+ assertEquals(data.length,index);
+ }
+
+ @Test
+ public void bulkModeAbortTest() throws Exception {
+ String testName = "bulkModeAbortTest";
+ Path methodTestDir = new Path(getTestDir(), testName);
+ String databaseName = testName.toLowerCase();
+ String dbDir = new Path(methodTestDir, "DB_" + testName).toString();
+ String tableName = newTableName(testName).toLowerCase();
+ String familyName = "my_family";
+
+ // include hbase config in conf file
+ Configuration conf = new Configuration(allConf);
+ conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(allConf.getAllProperties()));
+
+ String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '" + dbDir
+ + "'";
+ String tableQuery = "CREATE TABLE " + databaseName + "." + tableName +
+ "(key int, english string, spanish string) STORED BY " +
+ "'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" +
+ "TBLPROPERTIES ('" + HBaseConstants.PROPERTY_OSD_BULK_MODE_KEY + "'='true'," +
+ "'hbase.columns.mapping'=':key," + familyName + ":english," + familyName
+ + ":spanish')";
+
+ assertEquals(0, hcatDriver.run(dbquery).getResponseCode());
+ assertEquals(0, hcatDriver.run(tableQuery).getResponseCode());
+
+ String data[] = {"1,english:ONE,spanish:UNO",
+ "2,english:TWO,spanish:DOS",
+ "3,english:THREE,spanish:TRES"};
+
+ Path inputPath = new Path(methodTestDir, "mr_input");
+ getFileSystem().mkdirs(inputPath);
+ // create multiple files so we can test with multiple mappers
+ for (int i = 0; i < data.length; i++) {
+ FSDataOutputStream os = getFileSystem().create(
+ new Path(inputPath, "inputFile" + i + ".txt"));
+ os.write(Bytes.toBytes(data[i] + "\n"));
+ os.close();
+ }
+
+ Path workingDir = new Path(methodTestDir, "mr_abort");
+ OutputJobInfo outputJobInfo = OutputJobInfo.create(databaseName,
+ tableName, null);
+ Job job = configureJob(testName,
+ conf, workingDir, MapWriteAbortTransaction.class,
+ outputJobInfo, inputPath);
+ job.waitForCompletion(true);
+ assertFalse(job.waitForCompletion(true));
+
+ // verify that revision manager has it as aborted transaction
+ RevisionManager rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf);
+ try {
+ TableSnapshot snapshot = rm.createSnapshot(databaseName + "." + tableName);
+ for (String family : snapshot.getColumnFamilies()) {
+ assertEquals(1, snapshot.getRevision(family));
+ List<FamilyRevision> abortedWriteTransactions = rm.getAbortedWriteTransactions(
+ databaseName + "." + tableName, family);
+ assertEquals(1, abortedWriteTransactions.size());
+ assertEquals(1, abortedWriteTransactions.get(0).getRevision());
+ }
+ } finally {
+ rm.close();
+ }
+
+ //verify that hbase does not have any of the records.
+ //Since records are only written during commitJob,
+ //hbase should not have any records.
+ HTable table = new HTable(conf, databaseName + "." + tableName);
+ Scan scan = new Scan();
+ scan.addFamily(Bytes.toBytes(familyName));
+ ResultScanner scanner = table.getScanner(scan);
+ assertFalse(scanner.iterator().hasNext());
+
+ // verify that the input storage driver returns empty results.
+ Path outputDir = new Path(getTestDir(),
+ "mapred/testHBaseTableBulkIgnoreAbortedTransactions");
+ FileSystem fs = getFileSystem();
+ if (fs.exists(outputDir)) {
+ fs.delete(outputDir, true);
+ }
+ job = new Job(conf, "hbase-bulk-aborted-transaction");
+ job.setJarByClass(this.getClass());
+ job.setMapperClass(MapReadAbortedTransaction.class);
+ job.setInputFormatClass(HCatInputFormat.class);
+ InputJobInfo inputJobInfo = InputJobInfo.create(databaseName,
+ tableName, null, null, null);
+ HCatInputFormat.setInput(job, inputJobInfo);
+ job.setOutputFormatClass(TextOutputFormat.class);
+ TextOutputFormat.setOutputPath(job, outputDir);
+ job.setMapOutputKeyClass(BytesWritable.class);
+ job.setMapOutputValueClass(Text.class);
+ job.setOutputKeyClass(BytesWritable.class);
+ job.setOutputValueClass(Text.class);
+ job.setNumReduceTasks(0);
+ assertTrue(job.waitForCompletion(true));
+ }
+
+ private Job configureJob(String jobName, Configuration conf,
+ Path workingDir, Class<? extends Mapper> mapperClass,
+ OutputJobInfo outputJobInfo, Path inputPath) throws IOException {
+ Job job = new Job(conf, jobName);
+ job.setWorkingDirectory(workingDir);
+ job.setJarByClass(this.getClass());
+ job.setMapperClass(mapperClass);
+
+ job.setInputFormatClass(TextInputFormat.class);
+ TextInputFormat.setInputPaths(job, inputPath);
+ job.setOutputFormatClass(HCatOutputFormat.class);
+ HCatOutputFormat.setOutput(job, outputJobInfo);
+
+ job.setMapOutputKeyClass(BytesWritable.class);
+ job.setMapOutputValueClass(HCatRecord.class);
+ job.setOutputKeyClass(BytesWritable.class);
+ job.setOutputValueClass(HCatRecord.class);
+
+ job.setNumReduceTasks(0);
+ return job;
+ }
+
+}
+
Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java?rev=1296568&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java Sat Mar 3 02:56:05 2012
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapred.TableOutputFormat;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hcatalog.cli.HCatDriver;
+import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.hbase.snapshot.FamilyRevision;
+import org.apache.hcatalog.hbase.snapshot.RevisionManager;
+import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
+import org.apache.hcatalog.hbase.snapshot.Transaction;
+import org.apache.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test HBaseDirectOUtputFormat and HBaseHCatStorageHandler using a MiniCluster
+ */
+public class TestHBaseDirectOutputFormat extends SkeletonHBaseTest {
+
+ private final HiveConf allConf;
+ private final HCatDriver hcatDriver;
+
+ public TestHBaseDirectOutputFormat() {
+ allConf = getHiveConf();
+ allConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
+ HCatSemanticAnalyzer.class.getName());
+ allConf.set(HiveConf.ConfVars.HADOOPFS.varname, getFileSystem().getUri().toString());
+ allConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, new Path(getTestDir(),"warehouse").toString());
+
+ //Add hbase properties
+ for (Map.Entry<String, String> el : getHbaseConf())
+ allConf.set(el.getKey(), el.getValue());
+ for (Map.Entry<String, String> el : getJobConf())
+ allConf.set(el.getKey(), el.getValue());
+
+ SessionState.start(new CliSessionState(allConf));
+ hcatDriver = new HCatDriver();
+ }
+
+ @Test
+ public void directOutputFormatTest() throws IOException, ClassNotFoundException, InterruptedException {
+ String testName = "directOutputFormatTest";
+ Path methodTestDir = new Path(getTestDir(),testName);
+
+ String tableName = newTableName(testName).toLowerCase();
+ String familyName = "my_family";
+ byte[] familyNameBytes = Bytes.toBytes(familyName);
+
+ //include hbase config in conf file
+ Configuration conf = new Configuration(allConf);
+ conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(allConf.getAllProperties()));
+
+ //create table
+ createTable(tableName,new String[]{familyName});
+
+ String data[] = {"1,english:ONE,spanish:UNO",
+ "2,english:ONE,spanish:DOS",
+ "3,english:ONE,spanish:TRES"};
+
+
+
+ // input/output settings
+ Path inputPath = new Path(methodTestDir,"mr_input");
+ getFileSystem().mkdirs(inputPath);
+ FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile.txt"));
+ for(String line: data)
+ os.write(Bytes.toBytes(line + "\n"));
+ os.close();
+
+ //create job
+ JobConf job = new JobConf(conf);
+ job.setJobName(testName);
+ job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
+ job.setJarByClass(this.getClass());
+ job.setMapperClass(MapWrite.class);
+
+ job.setInputFormat(org.apache.hadoop.mapred.TextInputFormat.class);
+ org.apache.hadoop.mapred.TextInputFormat.setInputPaths(job, inputPath);
+
+ job.setOutputFormat(HBaseDirectOutputFormat.class);
+ job.set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, tableName);
+
+ //manually create transaction
+ RevisionManager rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf);
+ try {
+ OutputJobInfo outputJobInfo = OutputJobInfo.create("default", tableName, null);
+ Transaction txn = rm.beginWriteTransaction(tableName, Arrays.asList(familyName));
+ outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY,
+ HCatUtil.serialize(txn));
+ job.set(HCatConstants.HCAT_KEY_OUTPUT_INFO,
+ HCatUtil.serialize(outputJobInfo));
+ } finally {
+ rm.close();
+ }
+
+ job.setMapOutputKeyClass(BytesWritable.class);
+ job.setMapOutputValueClass(HCatRecord.class);
+ job.setOutputKeyClass(BytesWritable.class);
+ job.setOutputValueClass(HCatRecord.class);
+ job.setNumReduceTasks(0);
+
+ RunningJob runJob = JobClient.runJob(job);
+ runJob.waitForCompletion();
+ assertTrue(runJob.isSuccessful());
+
+ //verify
+ HTable table = new HTable(conf, tableName);
+ Scan scan = new Scan();
+ scan.addFamily(familyNameBytes);
+ ResultScanner scanner = table.getScanner(scan);
+ int index=0;
+ for(Result result: scanner) {
+ String vals[] = data[index].toString().split(",");
+ for(int i=1;i<vals.length;i++) {
+ String pair[] = vals[i].split(":");
+ assertTrue(result.containsColumn(familyNameBytes,Bytes.toBytes(pair[0])));
+ assertEquals(pair[1],Bytes.toString(result.getValue(familyNameBytes,Bytes.toBytes(pair[0]))));
+ }
+ index++;
+ }
+ assertEquals(data.length,index);
+ }
+
+ @Test
+ public void directHCatOutputFormatTest() throws Exception {
+ String testName = "directHCatOutputFormatTest";
+ Path methodTestDir = new Path(getTestDir(),testName);
+
+ String databaseName = testName.toLowerCase();
+ String dbDir = new Path(methodTestDir,"DB_"+testName).toString();
+ String tableName = newTableName(testName).toLowerCase();
+ String familyName = "my_family";
+ byte[] familyNameBytes = Bytes.toBytes(familyName);
+
+
+ //include hbase config in conf file
+ Configuration conf = new Configuration(allConf);
+ conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(allConf.getAllProperties()));
+
+
+ String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '" + dbDir + "'";
+ String tableQuery = "CREATE TABLE " + databaseName + "." + tableName +
+ "(key int, english string, spanish string) STORED BY " +
+ "'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" +
+ "TBLPROPERTIES (" +
+ "'hbase.columns.mapping'=':key,"+familyName+":english,"+familyName+":spanish')" ;
+
+ assertEquals(0, hcatDriver.run(dbquery).getResponseCode());
+ assertEquals(0, hcatDriver.run(tableQuery).getResponseCode());
+
+ String data[] = {"1,english:ONE,spanish:UNO",
+ "2,english:ONE,spanish:DOS",
+ "3,english:ONE,spanish:TRES"};
+
+ // input/output settings
+ Path inputPath = new Path(methodTestDir,"mr_input");
+ getFileSystem().mkdirs(inputPath);
+ //create multiple files so we can test with multiple mappers
+ for(int i=0;i<data.length;i++) {
+ FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile"+i+".txt"));
+ os.write(Bytes.toBytes(data[i] + "\n"));
+ os.close();
+ }
+
+ //create job
+ Path workingDir = new Path(methodTestDir, "mr_work");
+ OutputJobInfo outputJobInfo = OutputJobInfo.create(databaseName,
+ tableName, null);
+ Job job = configureJob(testName, conf, workingDir, MapHCatWrite.class,
+ outputJobInfo, inputPath);
+ assertTrue(job.waitForCompletion(true));
+
+ RevisionManager rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf);
+ try {
+ TableSnapshot snapshot = rm.createSnapshot(databaseName+"."+tableName);
+ for(String el: snapshot.getColumnFamilies()) {
+ assertEquals(1,snapshot.getRevision(el));
+ }
+ } finally {
+ rm.close();
+ }
+
+ //verify
+ HTable table = new HTable(conf, databaseName+"."+tableName);
+ Scan scan = new Scan();
+ scan.addFamily(familyNameBytes);
+ ResultScanner scanner = table.getScanner(scan);
+ int index=0;
+ for(Result result: scanner) {
+ String vals[] = data[index].toString().split(",");
+ for(int i=1;i<vals.length;i++) {
+ String pair[] = vals[i].split(":");
+ assertTrue(result.containsColumn(familyNameBytes,Bytes.toBytes(pair[0])));
+ assertEquals(pair[1],Bytes.toString(result.getValue(familyNameBytes,Bytes.toBytes(pair[0]))));
+ assertEquals(1l,result.getColumn(familyNameBytes,Bytes.toBytes(pair[0])).get(0).getTimestamp());
+ }
+ index++;
+ }
+ assertEquals(data.length,index);
+ }
+
+ @Test
+ public void directModeAbortTest() throws Exception {
+ String testName = "directModeAbortTest";
+ Path methodTestDir = new Path(getTestDir(), testName);
+ String databaseName = testName.toLowerCase();
+ String dbDir = new Path(methodTestDir, "DB_" + testName).toString();
+ String tableName = newTableName(testName).toLowerCase();
+ String familyName = "my_family";
+ byte[] familyNameBytes = Bytes.toBytes(familyName);
+
+ // include hbase config in conf file
+ Configuration conf = new Configuration(allConf);
+ conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(allConf.getAllProperties()));
+
+ String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '" + dbDir
+ + "'";
+ String tableQuery = "CREATE TABLE " + databaseName + "." + tableName +
+ "(key int, english string, spanish string) STORED BY " +
+ "'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" +
+ "TBLPROPERTIES (" +
+ "'hbase.columns.mapping'=':key," + familyName + ":english," + familyName +
+ ":spanish')";
+
+ assertEquals(0, hcatDriver.run(dbquery).getResponseCode());
+ assertEquals(0, hcatDriver.run(tableQuery).getResponseCode());
+
+ String data[] = {"1,english:ONE,spanish:UNO",
+ "2,english:TWO,spanish:DOS",
+ "3,english:THREE,spanish:TRES"};
+
+ Path inputPath = new Path(methodTestDir, "mr_input");
+ getFileSystem().mkdirs(inputPath);
+ // create multiple files so we can test with multiple mappers
+ for (int i = 0; i < data.length; i++) {
+ FSDataOutputStream os = getFileSystem().create(
+ new Path(inputPath, "inputFile" + i + ".txt"));
+ os.write(Bytes.toBytes(data[i] + "\n"));
+ os.close();
+ }
+
+ Path workingDir = new Path(methodTestDir, "mr_abort");
+ OutputJobInfo outputJobInfo = OutputJobInfo.create(databaseName,
+ tableName, null);
+ Job job = configureJob(testName, conf, workingDir, MapWriteAbortTransaction.class,
+ outputJobInfo, inputPath);
+ job.waitForCompletion(true);
+ assertFalse(job.waitForCompletion(true));
+
+ // verify that revision manager has it as aborted transaction
+ RevisionManager rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf);
+ try {
+ TableSnapshot snapshot = rm.createSnapshot(databaseName + "." + tableName);
+ for (String family : snapshot.getColumnFamilies()) {
+ assertEquals(1, snapshot.getRevision(family));
+ List<FamilyRevision> abortedWriteTransactions = rm.getAbortedWriteTransactions(
+ databaseName + "." + tableName, family);
+ assertEquals(1, abortedWriteTransactions.size());
+ assertEquals(1, abortedWriteTransactions.get(0).getRevision());
+ }
+ } finally {
+ rm.close();
+ }
+
+ // verify that hbase has the records of the successful maps.
+ HTable table = new HTable(conf, databaseName + "." + tableName);
+ Scan scan = new Scan();
+ scan.addFamily(familyNameBytes);
+ ResultScanner scanner = table.getScanner(scan);
+ int index = 0;
+ for (Result result : scanner) {
+ String vals[] = data[index].toString().split(",");
+ for (int i = 1; i < vals.length; i++) {
+ String pair[] = vals[i].split(":");
+ assertTrue(result.containsColumn(familyNameBytes, Bytes.toBytes(pair[0])));
+ assertEquals(pair[1],
+ Bytes.toString(result.getValue(familyNameBytes, Bytes.toBytes(pair[0]))));
+ assertEquals(1l, result.getColumn(familyNameBytes, Bytes.toBytes(pair[0])).get(0)
+ .getTimestamp());
+ }
+ index++;
+ }
+ assertEquals(data.length - 1, index);
+
+ // verify that the inputformat returns empty results.
+ Path outputDir = new Path(getTestDir(),
+ "mapred/testHBaseTableIgnoreAbortedTransactions");
+ FileSystem fs = getFileSystem();
+ if (fs.exists(outputDir)) {
+ fs.delete(outputDir, true);
+ }
+ job = new Job(conf, "hbase-aborted-transaction");
+ job.setJarByClass(this.getClass());
+ job.setMapperClass(MapReadAbortedTransaction.class);
+ job.setInputFormatClass(HCatInputFormat.class);
+ InputJobInfo inputJobInfo = InputJobInfo.create(databaseName,
+ tableName, null, null, null);
+ HCatInputFormat.setInput(job, inputJobInfo);
+ job.setOutputFormatClass(TextOutputFormat.class);
+ TextOutputFormat.setOutputPath(job, outputDir);
+ job.setMapOutputKeyClass(BytesWritable.class);
+ job.setMapOutputValueClass(Text.class);
+ job.setOutputKeyClass(BytesWritable.class);
+ job.setOutputValueClass(Text.class);
+ job.setNumReduceTasks(0);
+ assertTrue(job.waitForCompletion(true));
+ }
+
+ private Job configureJob(String jobName, Configuration conf,
+ Path workingDir, Class<? extends Mapper> mapperClass,
+ OutputJobInfo outputJobInfo, Path inputPath) throws IOException {
+ Job job = new Job(conf, jobName);
+ job.setWorkingDirectory(workingDir);
+ job.setJarByClass(this.getClass());
+ job.setMapperClass(mapperClass);
+
+ job.setInputFormatClass(TextInputFormat.class);
+ TextInputFormat.setInputPaths(job, inputPath);
+ job.setOutputFormatClass(HCatOutputFormat.class);
+ HCatOutputFormat.setOutput(job, outputJobInfo);
+
+ job.setMapOutputKeyClass(BytesWritable.class);
+ job.setMapOutputValueClass(HCatRecord.class);
+ job.setOutputKeyClass(BytesWritable.class);
+ job.setOutputValueClass(HCatRecord.class);
+
+ job.setNumReduceTasks(0);
+ return job;
+ }
+
+ public static class MapHCatWrite extends Mapper<LongWritable, Text, BytesWritable, HCatRecord> {
+
+ @Override
+ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+ OutputJobInfo jobInfo = (OutputJobInfo)HCatUtil.deserialize(context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
+ HCatRecord record = new DefaultHCatRecord(3);
+ HCatSchema schema = jobInfo.getOutputSchema();
+ String vals[] = value.toString().split(",");
+ record.setInteger("key",schema,Integer.parseInt(vals[0]));
+ for(int i=1;i<vals.length;i++) {
+ String pair[] = vals[i].split(":");
+ record.set(pair[0],schema,pair[1]);
+ }
+ context.write(null,record);
+ }
+ }
+
+ public static class MapWrite implements org.apache.hadoop.mapred.Mapper<LongWritable, Text, BytesWritable, Put> {
+
+ @Override
+ public void configure(JobConf job) {
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public void map(LongWritable key, Text value,
+ OutputCollector<BytesWritable, Put> output, Reporter reporter)
+ throws IOException {
+ String vals[] = value.toString().split(",");
+ Put put = new Put(Bytes.toBytes(vals[0]));
+ for(int i=1;i<vals.length;i++) {
+ String pair[] = vals[i].split(":");
+ put.add(Bytes.toBytes("my_family"),
+ Bytes.toBytes(pair[0]),
+ Bytes.toBytes(pair[1]));
+ }
+ output.collect(null, put);
+ }
+ }
+
+ static class MapWriteAbortTransaction extends Mapper<LongWritable, Text, BytesWritable, HCatRecord> {
+
+ @Override
+ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+ OutputJobInfo jobInfo = (OutputJobInfo)HCatUtil.deserialize(context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
+ HCatRecord record = new DefaultHCatRecord(3);
+ HCatSchema schema = jobInfo.getOutputSchema();
+ String vals[] = value.toString().split(",");
+ record.setInteger("key", schema, Integer.parseInt(vals[0]));
+ if (vals[0].equals("3")) {
+ throw new IOException("Failing map to test abort");
+ }
+ for (int i = 1; i < vals.length; i++) {
+ String pair[] = vals[i].split(":");
+ record.set(pair[0], schema, pair[1]);
+ }
+ context.write(null, record);
+ }
+
+ }
+
+ static class MapReadAbortedTransaction
+ extends
+ Mapper<ImmutableBytesWritable, HCatRecord, WritableComparable<?>, Text> {
+
+ @Override
+ public void run(Context context) throws IOException,
+ InterruptedException {
+ setup(context);
+ if (context.nextKeyValue()) {
+ map(context.getCurrentKey(), context.getCurrentValue(), context);
+ while (context.nextKeyValue()) {
+ map(context.getCurrentKey(), context.getCurrentValue(),
+ context);
+ }
+ throw new IOException("There should have been no records");
+ }
+ cleanup(context);
+ }
+
+ @Override
+ public void map(ImmutableBytesWritable key, HCatRecord value,
+ Context context) throws IOException, InterruptedException {
+ System.out.println("HCat record value" + value.toString());
+ }
+ }
+}