You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/09/12 03:21:29 UTC
svn commit: r1522098 [9/30] - in /hive/branches/vectorization: ./
beeline/src/test/org/apache/hive/beeline/src/test/ bin/ bin/ext/
common/src/java/org/apache/hadoop/hive/common/
common/src/java/org/apache/hadoop/hive/conf/ conf/ contrib/src/java/org/ap...
Modified: hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java (original)
+++ hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java Thu Sep 12 01:21:10 2013
@@ -47,13 +47,13 @@ import org.apache.hadoop.mapreduce.Mappe
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hcatalog.cli.HCatDriver;
-import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
-import org.apache.hcatalog.common.HCatConstants;
-import org.apache.hcatalog.common.HCatUtil;
-import org.apache.hcatalog.data.DefaultHCatRecord;
-import org.apache.hcatalog.data.HCatRecord;
-import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hive.hcatalog.cli.HCatDriver;
+import org.apache.hive.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
+import org.apache.hive.hcatalog.common.HCatConstants;
+import org.apache.hive.hcatalog.common.HCatUtil;
+import org.apache.hive.hcatalog.data.DefaultHCatRecord;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.hive.hcatalog.data.schema.HCatSchema;
import org.apache.hcatalog.hbase.HBaseBulkOutputFormat.HBaseBulkOutputCommitter;
import org.apache.hcatalog.hbase.TestHBaseDirectOutputFormat.MapReadAbortedTransaction;
import org.apache.hcatalog.hbase.TestHBaseDirectOutputFormat.MapWriteAbortTransaction;
@@ -62,9 +62,9 @@ import org.apache.hcatalog.hbase.snapsho
import org.apache.hcatalog.hbase.snapshot.RevisionManagerConfiguration;
import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
import org.apache.hcatalog.hbase.snapshot.Transaction;
-import org.apache.hcatalog.mapreduce.HCatInputFormat;
-import org.apache.hcatalog.mapreduce.HCatOutputFormat;
-import org.apache.hcatalog.mapreduce.OutputJobInfo;
+import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.hive.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hive.hcatalog.mapreduce.OutputJobInfo;
import org.junit.Test;
import org.slf4j.Logger;
@@ -84,548 +84,548 @@ import static org.junit.Assert.assertTru
* Including ImprtSequenceFile and HBaseBulkOutputFormat
*/
public class TestHBaseBulkOutputFormat extends SkeletonHBaseTest {
- private final static Logger LOG = LoggerFactory.getLogger(TestHBaseBulkOutputFormat.class);
+ private final static Logger LOG = LoggerFactory.getLogger(TestHBaseBulkOutputFormat.class);
- private final HiveConf allConf;
- private final HCatDriver hcatDriver;
+ private final HiveConf allConf;
+ private final HCatDriver hcatDriver;
- public TestHBaseBulkOutputFormat() {
- allConf = getHiveConf();
- allConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
- HCatSemanticAnalyzer.class.getName());
- allConf.set(HiveConf.ConfVars.HADOOPFS.varname, getFileSystem().getUri().toString());
- allConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, new Path(getTestDir(), "warehouse").toString());
-
- //Add hbase properties
- for (Map.Entry<String, String> el : getHbaseConf())
- allConf.set(el.getKey(), el.getValue());
- for (Map.Entry<String, String> el : getJobConf())
- allConf.set(el.getKey(), el.getValue());
-
- HBaseConfiguration.merge(
- allConf,
- RevisionManagerConfiguration.create());
- SessionState.start(new CliSessionState(allConf));
- hcatDriver = new HCatDriver();
- }
-
- public static class MapWriteOldMapper implements org.apache.hadoop.mapred.Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
-
- @Override
- public void close() throws IOException {
- }
-
- @Override
- public void configure(JobConf job) {
- }
-
- @Override
- public void map(LongWritable key, Text value,
- OutputCollector<ImmutableBytesWritable, Put> output,
- Reporter reporter) throws IOException {
- String vals[] = value.toString().split(",");
- Put put = new Put(Bytes.toBytes(vals[0]));
- for (int i = 1; i < vals.length; i++) {
- String pair[] = vals[i].split(":");
- put.add(Bytes.toBytes("my_family"),
- Bytes.toBytes(pair[0]),
- Bytes.toBytes(pair[1]));
- }
- output.collect(new ImmutableBytesWritable(Bytes.toBytes(vals[0])), put);
- }
-
- }
-
- public static class MapWrite extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
-
- @Override
- public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
- String vals[] = value.toString().split(",");
- Put put = new Put(Bytes.toBytes(vals[0]));
- for (int i = 1; i < vals.length; i++) {
- String pair[] = vals[i].split(":");
- put.add(Bytes.toBytes("my_family"),
- Bytes.toBytes(pair[0]),
- Bytes.toBytes(pair[1]));
- }
- context.write(new ImmutableBytesWritable(Bytes.toBytes(vals[0])), put);
- }
- }
-
- public static class MapHCatWrite extends Mapper<LongWritable, Text, BytesWritable, HCatRecord> {
- @Override
- public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
- OutputJobInfo jobInfo = (OutputJobInfo) HCatUtil.deserialize(context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
- HCatRecord record = new DefaultHCatRecord(3);
- HCatSchema schema = jobInfo.getOutputSchema();
- String vals[] = value.toString().split(",");
- record.setInteger("key", schema, Integer.parseInt(vals[0]));
- for (int i = 1; i < vals.length; i++) {
- String pair[] = vals[i].split(":");
- record.set(pair[0], schema, pair[1]);
- }
- context.write(null, record);
- }
- }
-
- @Test
- public void hbaseBulkOutputFormatTest() throws IOException, ClassNotFoundException, InterruptedException {
- String testName = "hbaseBulkOutputFormatTest";
- Path methodTestDir = new Path(getTestDir(), testName);
- LOG.info("starting: " + testName);
-
- String tableName = newTableName(testName).toLowerCase();
- String familyName = "my_family";
- byte[] familyNameBytes = Bytes.toBytes(familyName);
-
- //include hbase config in conf file
- Configuration conf = new Configuration(allConf);
-
- //create table
- conf.set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, tableName);
- conf.set("yarn.scheduler.capacity.root.queues", "default");
- conf.set("yarn.scheduler.capacity.root.default.capacity", "100");
- createTable(tableName, new String[]{familyName});
-
- String data[] = {"1,english:one,spanish:uno",
- "2,english:two,spanish:dos",
- "3,english:three,spanish:tres"};
-
-
- // input/output settings
- Path inputPath = new Path(methodTestDir, "mr_input");
- FSDataOutputStream os = getFileSystem().create(new Path(inputPath, "inputFile.txt"));
- for (String line : data)
- os.write(Bytes.toBytes(line + "\n"));
- os.close();
- Path interPath = new Path(methodTestDir, "inter");
- //create job
- JobConf job = new JobConf(conf);
- job.setWorkingDirectory(new Path(methodTestDir, "mr_work"));
- job.setJarByClass(this.getClass());
- job.setMapperClass(MapWriteOldMapper.class);
-
- job.setInputFormat(org.apache.hadoop.mapred.TextInputFormat.class);
- org.apache.hadoop.mapred.TextInputFormat.setInputPaths(job, inputPath);
-
- job.setOutputFormat(HBaseBulkOutputFormat.class);
- org.apache.hadoop.mapred.SequenceFileOutputFormat.setOutputPath(job, interPath);
- job.setOutputCommitter(HBaseBulkOutputCommitter.class);
-
- //manually create transaction
- RevisionManager rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf);
- try {
- OutputJobInfo outputJobInfo = OutputJobInfo.create("default", tableName, null);
- Transaction txn = rm.beginWriteTransaction(tableName, Arrays.asList(familyName));
- outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY,
- HCatUtil.serialize(txn));
- job.set(HCatConstants.HCAT_KEY_OUTPUT_INFO,
- HCatUtil.serialize(outputJobInfo));
- } finally {
- rm.close();
- }
-
- job.setMapOutputKeyClass(ImmutableBytesWritable.class);
- job.setMapOutputValueClass(HCatRecord.class);
-
- job.setOutputKeyClass(ImmutableBytesWritable.class);
- job.setOutputValueClass(HCatRecord.class);
-
- job.setNumReduceTasks(0);
-
- RunningJob runJob = JobClient.runJob(job);
- runJob.waitForCompletion();
- assertTrue(runJob.isSuccessful());
-
- //verify
- HTable table = new HTable(conf, tableName);
- Scan scan = new Scan();
- scan.addFamily(familyNameBytes);
- ResultScanner scanner = table.getScanner(scan);
- int index = 0;
- for (Result result : scanner) {
- String vals[] = data[index].toString().split(",");
- for (int i = 1; i < vals.length; i++) {
- String pair[] = vals[i].split(":");
- assertTrue(result.containsColumn(familyNameBytes, Bytes.toBytes(pair[0])));
- assertEquals(pair[1], Bytes.toString(result.getValue(familyNameBytes, Bytes.toBytes(pair[0]))));
- }
- index++;
- }
- //test if load count is the same
- assertEquals(data.length, index);
- //test if scratch directory was erased
- assertFalse(FileSystem.get(job).exists(interPath));
- }
-
- @Test
- public void importSequenceFileTest() throws IOException, ClassNotFoundException, InterruptedException {
- String testName = "importSequenceFileTest";
- Path methodTestDir = new Path(getTestDir(), testName);
- LOG.info("starting: " + testName);
-
- String tableName = newTableName(testName).toLowerCase();
- String familyName = "my_family";
- byte[] familyNameBytes = Bytes.toBytes(familyName);
-
- //include hbase config in conf file
- Configuration conf = new Configuration(allConf);
-
- //create table
- createTable(tableName, new String[]{familyName});
-
- String data[] = {"1,english:one,spanish:uno",
- "2,english:two,spanish:dos",
- "3,english:three,spanish:tres"};
-
-
- // input/output settings
- Path inputPath = new Path(methodTestDir, "mr_input");
- getFileSystem().mkdirs(inputPath);
- FSDataOutputStream os = getFileSystem().create(new Path(inputPath, "inputFile.txt"));
- for (String line : data)
- os.write(Bytes.toBytes(line + "\n"));
- os.close();
- Path interPath = new Path(methodTestDir, "inter");
- Path scratchPath = new Path(methodTestDir, "scratch");
-
-
- //create job
- Job job = new Job(conf, testName);
- job.setWorkingDirectory(new Path(methodTestDir, "mr_work"));
- job.setJarByClass(this.getClass());
- job.setMapperClass(MapWrite.class);
-
- job.setInputFormatClass(TextInputFormat.class);
- TextInputFormat.setInputPaths(job, inputPath);
-
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
- SequenceFileOutputFormat.setOutputPath(job, interPath);
-
- job.setMapOutputKeyClass(ImmutableBytesWritable.class);
- job.setMapOutputValueClass(Put.class);
-
- job.setOutputKeyClass(ImmutableBytesWritable.class);
- job.setOutputValueClass(Put.class);
-
- job.setNumReduceTasks(0);
- assertTrue(job.waitForCompletion(true));
-
- job = new Job(new Configuration(allConf), testName + "_importer");
- assertTrue(ImportSequenceFile.runJob(job, tableName, interPath, scratchPath));
-
- //verify
- HTable table = new HTable(conf, tableName);
- Scan scan = new Scan();
- scan.addFamily(familyNameBytes);
- ResultScanner scanner = table.getScanner(scan);
- int index = 0;
- for (Result result : scanner) {
- String vals[] = data[index].toString().split(",");
- for (int i = 1; i < vals.length; i++) {
- String pair[] = vals[i].split(":");
- assertTrue(result.containsColumn(familyNameBytes, Bytes.toBytes(pair[0])));
- assertEquals(pair[1], Bytes.toString(result.getValue(familyNameBytes, Bytes.toBytes(pair[0]))));
- }
- index++;
- }
- //test if load count is the same
- assertEquals(data.length, index);
- //test if scratch directory was erased
- assertFalse(FileSystem.get(job.getConfiguration()).exists(scratchPath));
- }
-
- @Test
- public void bulkModeHCatOutputFormatTest() throws Exception {
- String testName = "bulkModeHCatOutputFormatTest";
- Path methodTestDir = new Path(getTestDir(), testName);
- LOG.info("starting: " + testName);
-
- String databaseName = testName.toLowerCase();
- String dbDir = new Path(methodTestDir, "DB_" + testName).toString();
- String tableName = newTableName(testName).toLowerCase();
- String familyName = "my_family";
- byte[] familyNameBytes = Bytes.toBytes(familyName);
-
-
- //include hbase config in conf file
- Configuration conf = new Configuration(allConf);
- conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(allConf.getAllProperties()));
-
-
- String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '" + dbDir + "'";
- String tableQuery = "CREATE TABLE " + databaseName + "." + tableName +
- "(key int, english string, spanish string) STORED BY " +
- "'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" +
- "TBLPROPERTIES ('" + HBaseConstants.PROPERTY_BULK_OUTPUT_MODE_KEY + "'='true'," +
- "'hbase.columns.mapping'=':key," + familyName + ":english," + familyName + ":spanish')";
-
- assertEquals(0, hcatDriver.run(dbquery).getResponseCode());
- assertEquals(0, hcatDriver.run(tableQuery).getResponseCode());
-
- String data[] = {"1,english:ONE,spanish:UNO",
- "2,english:TWO,spanish:DOS",
- "3,english:THREE,spanish:TRES"};
-
- // input/output settings
- Path inputPath = new Path(methodTestDir, "mr_input");
- getFileSystem().mkdirs(inputPath);
- //create multiple files so we can test with multiple mappers
- for (int i = 0; i < data.length; i++) {
- FSDataOutputStream os = getFileSystem().create(new Path(inputPath, "inputFile" + i + ".txt"));
- os.write(Bytes.toBytes(data[i] + "\n"));
- os.close();
- }
-
- //create job
- Job job = new Job(conf, testName);
- job.setWorkingDirectory(new Path(methodTestDir, "mr_work"));
- job.setJarByClass(this.getClass());
- job.setMapperClass(MapHCatWrite.class);
-
- job.setInputFormatClass(TextInputFormat.class);
- TextInputFormat.setInputPaths(job, inputPath);
-
-
- job.setOutputFormatClass(HCatOutputFormat.class);
- OutputJobInfo outputJobInfo = OutputJobInfo.create(databaseName, tableName, null);
- HCatOutputFormat.setOutput(job, outputJobInfo);
-
- job.setMapOutputKeyClass(BytesWritable.class);
- job.setMapOutputValueClass(HCatRecord.class);
-
- job.setOutputKeyClass(BytesWritable.class);
- job.setOutputValueClass(HCatRecord.class);
-
- job.setNumReduceTasks(0);
-
- assertTrue(job.waitForCompletion(true));
- RevisionManager rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf);
- try {
- TableSnapshot snapshot = rm.createSnapshot(databaseName + "." + tableName);
- for (String el : snapshot.getColumnFamilies()) {
- assertEquals(1, snapshot.getRevision(el));
- }
- } finally {
- rm.close();
- }
-
- //verify
- HTable table = new HTable(conf, databaseName + "." + tableName);
- Scan scan = new Scan();
- scan.addFamily(familyNameBytes);
- ResultScanner scanner = table.getScanner(scan);
- int index = 0;
- for (Result result : scanner) {
- String vals[] = data[index].toString().split(",");
- for (int i = 1; i < vals.length; i++) {
- String pair[] = vals[i].split(":");
- assertTrue(result.containsColumn(familyNameBytes, Bytes.toBytes(pair[0])));
- assertEquals(pair[1], Bytes.toString(result.getValue(familyNameBytes, Bytes.toBytes(pair[0]))));
- assertEquals(1l, result.getColumn(familyNameBytes, Bytes.toBytes(pair[0])).get(0).getTimestamp());
- }
- index++;
- }
- //test if load count is the same
- assertEquals(data.length, index);
- }
-
- @Test
- public void bulkModeHCatOutputFormatTestWithDefaultDB() throws Exception {
- String testName = "bulkModeHCatOutputFormatTestWithDefaultDB";
- Path methodTestDir = new Path(getTestDir(), testName);
-
- String databaseName = "default";
- String dbDir = new Path(methodTestDir, "DB_" + testName).toString();
- String tableName = newTableName(testName).toLowerCase();
- String familyName = "my_family";
- byte[] familyNameBytes = Bytes.toBytes(familyName);
-
-
- //include hbase config in conf file
- Configuration conf = new Configuration(allConf);
- conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(allConf.getAllProperties()));
-
-
- String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '" + dbDir + "'";
- String tableQuery = "CREATE TABLE " + databaseName + "." + tableName +
- "(key int, english string, spanish string) STORED BY " +
- "'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" +
- "TBLPROPERTIES ('" + HBaseConstants.PROPERTY_BULK_OUTPUT_MODE_KEY + "'='true'," +
- "'hbase.columns.mapping'=':key," + familyName + ":english," + familyName + ":spanish')";
-
- assertEquals(0, hcatDriver.run(dbquery).getResponseCode());
- assertEquals(0, hcatDriver.run(tableQuery).getResponseCode());
-
- String data[] = {"1,english:ONE,spanish:UNO",
- "2,english:TWO,spanish:DOS",
- "3,english:THREE,spanish:TRES"};
-
- // input/output settings
- Path inputPath = new Path(methodTestDir, "mr_input");
- getFileSystem().mkdirs(inputPath);
- FSDataOutputStream os = getFileSystem().create(new Path(inputPath, "inputFile.txt"));
- for (String line : data)
- os.write(Bytes.toBytes(line + "\n"));
- os.close();
-
- //create job
- Job job = new Job(conf, testName);
- job.setWorkingDirectory(new Path(methodTestDir, "mr_work"));
- job.setJarByClass(this.getClass());
- job.setMapperClass(MapHCatWrite.class);
-
- job.setInputFormatClass(TextInputFormat.class);
- TextInputFormat.setInputPaths(job, inputPath);
-
-
- job.setOutputFormatClass(HCatOutputFormat.class);
- OutputJobInfo outputJobInfo = OutputJobInfo.create(databaseName, tableName, null);
- HCatOutputFormat.setOutput(job, outputJobInfo);
-
- job.setMapOutputKeyClass(BytesWritable.class);
- job.setMapOutputValueClass(HCatRecord.class);
-
- job.setOutputKeyClass(BytesWritable.class);
- job.setOutputValueClass(HCatRecord.class);
-
- job.setNumReduceTasks(0);
-
- assertTrue(job.waitForCompletion(true));
-
- //verify
- HTable table = new HTable(conf, tableName);
- Scan scan = new Scan();
- scan.addFamily(familyNameBytes);
- ResultScanner scanner = table.getScanner(scan);
- int index = 0;
- for (Result result : scanner) {
- String vals[] = data[index].toString().split(",");
- for (int i = 1; i < vals.length; i++) {
- String pair[] = vals[i].split(":");
- assertTrue(result.containsColumn(familyNameBytes, Bytes.toBytes(pair[0])));
- assertEquals(pair[1], Bytes.toString(result.getValue(familyNameBytes, Bytes.toBytes(pair[0]))));
- }
- index++;
- }
- //test if load count is the same
- assertEquals(data.length, index);
- }
-
- @Test
- public void bulkModeAbortTest() throws Exception {
- String testName = "bulkModeAbortTest";
- Path methodTestDir = new Path(getTestDir(), testName);
- String databaseName = testName.toLowerCase();
- String dbDir = new Path(methodTestDir, "DB_" + testName).toString();
- String tableName = newTableName(testName).toLowerCase();
- String familyName = "my_family";
-
- // include hbase config in conf file
- Configuration conf = new Configuration(allConf);
- conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(allConf.getAllProperties()));
-
- String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '" + dbDir
- + "'";
- String tableQuery = "CREATE TABLE " + databaseName + "." + tableName +
- "(key int, english string, spanish string) STORED BY " +
- "'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" +
- "TBLPROPERTIES ('" + HBaseConstants.PROPERTY_BULK_OUTPUT_MODE_KEY + "'='true'," +
- "'hbase.columns.mapping'=':key," + familyName + ":english," + familyName
- + ":spanish')";
-
- assertEquals(0, hcatDriver.run(dbquery).getResponseCode());
- assertEquals(0, hcatDriver.run(tableQuery).getResponseCode());
-
- String data[] = {"1,english:ONE,spanish:UNO",
- "2,english:TWO,spanish:DOS",
- "3,english:THREE,spanish:TRES"};
-
- Path inputPath = new Path(methodTestDir, "mr_input");
- getFileSystem().mkdirs(inputPath);
- // create multiple files so we can test with multiple mappers
- for (int i = 0; i < data.length; i++) {
- FSDataOutputStream os = getFileSystem().create(
- new Path(inputPath, "inputFile" + i + ".txt"));
- os.write(Bytes.toBytes(data[i] + "\n"));
- os.close();
- }
-
- Path workingDir = new Path(methodTestDir, "mr_abort");
- OutputJobInfo outputJobInfo = OutputJobInfo.create(databaseName,
- tableName, null);
- Job job = configureJob(testName,
- conf, workingDir, MapWriteAbortTransaction.class,
- outputJobInfo, inputPath);
- assertFalse(job.waitForCompletion(true));
-
- // verify that revision manager has it as aborted transaction
- RevisionManager rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf);
- try {
- TableSnapshot snapshot = rm.createSnapshot(databaseName + "." + tableName);
- for (String family : snapshot.getColumnFamilies()) {
- assertEquals(1, snapshot.getRevision(family));
- List<FamilyRevision> abortedWriteTransactions = rm.getAbortedWriteTransactions(
- databaseName + "." + tableName, family);
- assertEquals(1, abortedWriteTransactions.size());
- assertEquals(1, abortedWriteTransactions.get(0).getRevision());
- }
- } finally {
- rm.close();
- }
-
- //verify that hbase does not have any of the records.
- //Since records are only written during commitJob,
- //hbase should not have any records.
- HTable table = new HTable(conf, databaseName + "." + tableName);
- Scan scan = new Scan();
- scan.addFamily(Bytes.toBytes(familyName));
- ResultScanner scanner = table.getScanner(scan);
- assertFalse(scanner.iterator().hasNext());
-
- // verify that the storage handler input format returns empty results.
- Path outputDir = new Path(getTestDir(),
- "mapred/testHBaseTableBulkIgnoreAbortedTransactions");
- FileSystem fs = getFileSystem();
- if (fs.exists(outputDir)) {
- fs.delete(outputDir, true);
- }
- job = new Job(conf, "hbase-bulk-aborted-transaction");
- job.setJarByClass(this.getClass());
- job.setMapperClass(MapReadAbortedTransaction.class);
- job.setInputFormatClass(HCatInputFormat.class);
- HCatInputFormat.setInput(job, databaseName, tableName);
- job.setOutputFormatClass(TextOutputFormat.class);
- TextOutputFormat.setOutputPath(job, outputDir);
- job.setMapOutputKeyClass(BytesWritable.class);
- job.setMapOutputValueClass(Text.class);
- job.setOutputKeyClass(BytesWritable.class);
- job.setOutputValueClass(Text.class);
- job.setNumReduceTasks(0);
- assertTrue(job.waitForCompletion(true));
- }
-
- private Job configureJob(String jobName, Configuration conf,
- Path workingDir, Class<? extends Mapper> mapperClass,
- OutputJobInfo outputJobInfo, Path inputPath) throws IOException {
- Job job = new Job(conf, jobName);
- job.setWorkingDirectory(workingDir);
- job.setJarByClass(this.getClass());
- job.setMapperClass(mapperClass);
-
- job.setInputFormatClass(TextInputFormat.class);
- TextInputFormat.setInputPaths(job, inputPath);
- job.setOutputFormatClass(HCatOutputFormat.class);
- HCatOutputFormat.setOutput(job, outputJobInfo);
-
- job.setMapOutputKeyClass(BytesWritable.class);
- job.setMapOutputValueClass(HCatRecord.class);
- job.setOutputKeyClass(BytesWritable.class);
- job.setOutputValueClass(HCatRecord.class);
+ public TestHBaseBulkOutputFormat() {
+ allConf = getHiveConf();
+ allConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
+ HCatSemanticAnalyzer.class.getName());
+ allConf.set(HiveConf.ConfVars.HADOOPFS.varname, getFileSystem().getUri().toString());
+ allConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, new Path(getTestDir(), "warehouse").toString());
+
+ //Add hbase properties
+ for (Map.Entry<String, String> el : getHbaseConf())
+ allConf.set(el.getKey(), el.getValue());
+ for (Map.Entry<String, String> el : getJobConf())
+ allConf.set(el.getKey(), el.getValue());
+
+ HBaseConfiguration.merge(
+ allConf,
+ RevisionManagerConfiguration.create());
+ SessionState.start(new CliSessionState(allConf));
+ hcatDriver = new HCatDriver();
+ }
- job.setNumReduceTasks(0);
- return job;
+ public static class MapWriteOldMapper implements org.apache.hadoop.mapred.Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public void configure(JobConf job) {
+ }
+
+ @Override
+ public void map(LongWritable key, Text value,
+ OutputCollector<ImmutableBytesWritable, Put> output,
+ Reporter reporter) throws IOException {
+ String vals[] = value.toString().split(",");
+ Put put = new Put(Bytes.toBytes(vals[0]));
+ for (int i = 1; i < vals.length; i++) {
+ String pair[] = vals[i].split(":");
+ put.add(Bytes.toBytes("my_family"),
+ Bytes.toBytes(pair[0]),
+ Bytes.toBytes(pair[1]));
+ }
+ output.collect(new ImmutableBytesWritable(Bytes.toBytes(vals[0])), put);
+ }
+
+ }
+
+ public static class MapWrite extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
+
+ @Override
+ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+ String vals[] = value.toString().split(",");
+ Put put = new Put(Bytes.toBytes(vals[0]));
+ for (int i = 1; i < vals.length; i++) {
+ String pair[] = vals[i].split(":");
+ put.add(Bytes.toBytes("my_family"),
+ Bytes.toBytes(pair[0]),
+ Bytes.toBytes(pair[1]));
+ }
+ context.write(new ImmutableBytesWritable(Bytes.toBytes(vals[0])), put);
+ }
+ }
+
+ public static class MapHCatWrite extends Mapper<LongWritable, Text, BytesWritable, HCatRecord> {
+ @Override
+ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+ OutputJobInfo jobInfo = (OutputJobInfo) HCatUtil.deserialize(context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
+ HCatRecord record = new DefaultHCatRecord(3);
+ HCatSchema schema = jobInfo.getOutputSchema();
+ String vals[] = value.toString().split(",");
+ record.setInteger("key", schema, Integer.parseInt(vals[0]));
+ for (int i = 1; i < vals.length; i++) {
+ String pair[] = vals[i].split(":");
+ record.set(pair[0], schema, pair[1]);
+ }
+ context.write(null, record);
+ }
+ }
+
+ @Test
+ public void hbaseBulkOutputFormatTest() throws IOException, ClassNotFoundException, InterruptedException {
+ String testName = "hbaseBulkOutputFormatTest";
+ Path methodTestDir = new Path(getTestDir(), testName);
+ LOG.info("starting: " + testName);
+
+ String tableName = newTableName(testName).toLowerCase();
+ String familyName = "my_family";
+ byte[] familyNameBytes = Bytes.toBytes(familyName);
+
+ //include hbase config in conf file
+ Configuration conf = new Configuration(allConf);
+
+ //create table
+ conf.set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, tableName);
+ conf.set("yarn.scheduler.capacity.root.queues", "default");
+ conf.set("yarn.scheduler.capacity.root.default.capacity", "100");
+ createTable(tableName, new String[]{familyName});
+
+ String data[] = {"1,english:one,spanish:uno",
+ "2,english:two,spanish:dos",
+ "3,english:three,spanish:tres"};
+
+
+ // input/output settings
+ Path inputPath = new Path(methodTestDir, "mr_input");
+ FSDataOutputStream os = getFileSystem().create(new Path(inputPath, "inputFile.txt"));
+ for (String line : data)
+ os.write(Bytes.toBytes(line + "\n"));
+ os.close();
+ Path interPath = new Path(methodTestDir, "inter");
+ //create job
+ JobConf job = new JobConf(conf);
+ job.setWorkingDirectory(new Path(methodTestDir, "mr_work"));
+ job.setJarByClass(this.getClass());
+ job.setMapperClass(MapWriteOldMapper.class);
+
+ job.setInputFormat(org.apache.hadoop.mapred.TextInputFormat.class);
+ org.apache.hadoop.mapred.TextInputFormat.setInputPaths(job, inputPath);
+
+ job.setOutputFormat(HBaseBulkOutputFormat.class);
+ org.apache.hadoop.mapred.SequenceFileOutputFormat.setOutputPath(job, interPath);
+ job.setOutputCommitter(HBaseBulkOutputCommitter.class);
+
+ //manually create transaction
+ RevisionManager rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf);
+ try {
+ OutputJobInfo outputJobInfo = OutputJobInfo.create("default", tableName, null);
+ Transaction txn = rm.beginWriteTransaction(tableName, Arrays.asList(familyName));
+ outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY,
+ HCatUtil.serialize(txn));
+ job.set(HCatConstants.HCAT_KEY_OUTPUT_INFO,
+ HCatUtil.serialize(outputJobInfo));
+ } finally {
+ rm.close();
+ }
+
+ job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ job.setMapOutputValueClass(HCatRecord.class);
+
+ job.setOutputKeyClass(ImmutableBytesWritable.class);
+ job.setOutputValueClass(HCatRecord.class);
+
+ job.setNumReduceTasks(0);
+
+ RunningJob runJob = JobClient.runJob(job);
+ runJob.waitForCompletion();
+ assertTrue(runJob.isSuccessful());
+
+ //verify
+ HTable table = new HTable(conf, tableName);
+ Scan scan = new Scan();
+ scan.addFamily(familyNameBytes);
+ ResultScanner scanner = table.getScanner(scan);
+ int index = 0;
+ for (Result result : scanner) {
+ String vals[] = data[index].toString().split(",");
+ for (int i = 1; i < vals.length; i++) {
+ String pair[] = vals[i].split(":");
+ assertTrue(result.containsColumn(familyNameBytes, Bytes.toBytes(pair[0])));
+ assertEquals(pair[1], Bytes.toString(result.getValue(familyNameBytes, Bytes.toBytes(pair[0]))));
+ }
+ index++;
+ }
+ //test if load count is the same
+ assertEquals(data.length, index);
+ //test if scratch directory was erased
+ assertFalse(FileSystem.get(job).exists(interPath));
+ }
+
+ @Test
+ public void importSequenceFileTest() throws IOException, ClassNotFoundException, InterruptedException {
+ String testName = "importSequenceFileTest";
+ Path methodTestDir = new Path(getTestDir(), testName);
+ LOG.info("starting: " + testName);
+
+ String tableName = newTableName(testName).toLowerCase();
+ String familyName = "my_family";
+ byte[] familyNameBytes = Bytes.toBytes(familyName);
+
+ //include hbase config in conf file
+ Configuration conf = new Configuration(allConf);
+
+ //create table
+ createTable(tableName, new String[]{familyName});
+
+ String data[] = {"1,english:one,spanish:uno",
+ "2,english:two,spanish:dos",
+ "3,english:three,spanish:tres"};
+
+
+ // input/output settings
+ Path inputPath = new Path(methodTestDir, "mr_input");
+ getFileSystem().mkdirs(inputPath);
+ FSDataOutputStream os = getFileSystem().create(new Path(inputPath, "inputFile.txt"));
+ for (String line : data)
+ os.write(Bytes.toBytes(line + "\n"));
+ os.close();
+ Path interPath = new Path(methodTestDir, "inter");
+ Path scratchPath = new Path(methodTestDir, "scratch");
+
+
+ //create job
+ Job job = new Job(conf, testName);
+ job.setWorkingDirectory(new Path(methodTestDir, "mr_work"));
+ job.setJarByClass(this.getClass());
+ job.setMapperClass(MapWrite.class);
+
+ job.setInputFormatClass(TextInputFormat.class);
+ TextInputFormat.setInputPaths(job, inputPath);
+
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ SequenceFileOutputFormat.setOutputPath(job, interPath);
+
+ job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ job.setMapOutputValueClass(Put.class);
+
+ job.setOutputKeyClass(ImmutableBytesWritable.class);
+ job.setOutputValueClass(Put.class);
+
+ job.setNumReduceTasks(0);
+ assertTrue(job.waitForCompletion(true));
+
+ job = new Job(new Configuration(allConf), testName + "_importer");
+ assertTrue(ImportSequenceFile.runJob(job, tableName, interPath, scratchPath));
+
+ //verify
+ HTable table = new HTable(conf, tableName);
+ Scan scan = new Scan();
+ scan.addFamily(familyNameBytes);
+ ResultScanner scanner = table.getScanner(scan);
+ int index = 0;
+ for (Result result : scanner) {
+ String vals[] = data[index].toString().split(",");
+ for (int i = 1; i < vals.length; i++) {
+ String pair[] = vals[i].split(":");
+ assertTrue(result.containsColumn(familyNameBytes, Bytes.toBytes(pair[0])));
+ assertEquals(pair[1], Bytes.toString(result.getValue(familyNameBytes, Bytes.toBytes(pair[0]))));
+ }
+ index++;
+ }
+ //test if load count is the same
+ assertEquals(data.length, index);
+ //test if scratch directory was erased
+ assertFalse(FileSystem.get(job.getConfiguration()).exists(scratchPath));
+ }
+
+ @Test
+ public void bulkModeHCatOutputFormatTest() throws Exception {
+ String testName = "bulkModeHCatOutputFormatTest";
+ Path methodTestDir = new Path(getTestDir(), testName);
+ LOG.info("starting: " + testName);
+
+ String databaseName = testName.toLowerCase();
+ String dbDir = new Path(methodTestDir, "DB_" + testName).toString();
+ String tableName = newTableName(testName).toLowerCase();
+ String familyName = "my_family";
+ byte[] familyNameBytes = Bytes.toBytes(familyName);
+
+
+ //include hbase config in conf file
+ Configuration conf = new Configuration(allConf);
+ conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(allConf.getAllProperties()));
+
+
+ String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '" + dbDir + "'";
+ String tableQuery = "CREATE TABLE " + databaseName + "." + tableName +
+ "(key int, english string, spanish string) STORED BY " +
+ "'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" +
+ "TBLPROPERTIES ('" + HBaseConstants.PROPERTY_BULK_OUTPUT_MODE_KEY + "'='true'," +
+ "'hbase.columns.mapping'=':key," + familyName + ":english," + familyName + ":spanish')";
+
+ assertEquals(0, hcatDriver.run(dbquery).getResponseCode());
+ assertEquals(0, hcatDriver.run(tableQuery).getResponseCode());
+
+ String data[] = {"1,english:ONE,spanish:UNO",
+ "2,english:TWO,spanish:DOS",
+ "3,english:THREE,spanish:TRES"};
+
+ // input/output settings
+ Path inputPath = new Path(methodTestDir, "mr_input");
+ getFileSystem().mkdirs(inputPath);
+ //create multiple files so we can test with multiple mappers
+ for (int i = 0; i < data.length; i++) {
+ FSDataOutputStream os = getFileSystem().create(new Path(inputPath, "inputFile" + i + ".txt"));
+ os.write(Bytes.toBytes(data[i] + "\n"));
+ os.close();
+ }
+
+ //create job
+ Job job = new Job(conf, testName);
+ job.setWorkingDirectory(new Path(methodTestDir, "mr_work"));
+ job.setJarByClass(this.getClass());
+ job.setMapperClass(MapHCatWrite.class);
+
+ job.setInputFormatClass(TextInputFormat.class);
+ TextInputFormat.setInputPaths(job, inputPath);
+
+
+ job.setOutputFormatClass(HCatOutputFormat.class);
+ OutputJobInfo outputJobInfo = OutputJobInfo.create(databaseName, tableName, null);
+ HCatOutputFormat.setOutput(job, outputJobInfo);
+
+ job.setMapOutputKeyClass(BytesWritable.class);
+ job.setMapOutputValueClass(HCatRecord.class);
+
+ job.setOutputKeyClass(BytesWritable.class);
+ job.setOutputValueClass(HCatRecord.class);
+
+ job.setNumReduceTasks(0);
+
+ assertTrue(job.waitForCompletion(true));
+ RevisionManager rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf);
+ try {
+ TableSnapshot snapshot = rm.createSnapshot(databaseName + "." + tableName);
+ for (String el : snapshot.getColumnFamilies()) {
+ assertEquals(1, snapshot.getRevision(el));
+ }
+ } finally {
+ rm.close();
+ }
+
+ //verify
+ HTable table = new HTable(conf, databaseName + "." + tableName);
+ Scan scan = new Scan();
+ scan.addFamily(familyNameBytes);
+ ResultScanner scanner = table.getScanner(scan);
+ int index = 0;
+ for (Result result : scanner) {
+ String vals[] = data[index].toString().split(",");
+ for (int i = 1; i < vals.length; i++) {
+ String pair[] = vals[i].split(":");
+ assertTrue(result.containsColumn(familyNameBytes, Bytes.toBytes(pair[0])));
+ assertEquals(pair[1], Bytes.toString(result.getValue(familyNameBytes, Bytes.toBytes(pair[0]))));
+ assertEquals(1l, result.getColumn(familyNameBytes, Bytes.toBytes(pair[0])).get(0).getTimestamp());
+ }
+ index++;
+ }
+ //test if load count is the same
+ assertEquals(data.length, index);
+ }
+
+ @Test
+ public void bulkModeHCatOutputFormatTestWithDefaultDB() throws Exception {
+ String testName = "bulkModeHCatOutputFormatTestWithDefaultDB";
+ Path methodTestDir = new Path(getTestDir(), testName);
+
+ String databaseName = "default";
+ String dbDir = new Path(methodTestDir, "DB_" + testName).toString();
+ String tableName = newTableName(testName).toLowerCase();
+ String familyName = "my_family";
+ byte[] familyNameBytes = Bytes.toBytes(familyName);
+
+
+ //include hbase config in conf file
+ Configuration conf = new Configuration(allConf);
+ conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(allConf.getAllProperties()));
+
+
+ String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '" + dbDir + "'";
+ String tableQuery = "CREATE TABLE " + databaseName + "." + tableName +
+ "(key int, english string, spanish string) STORED BY " +
+ "'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" +
+ "TBLPROPERTIES ('" + HBaseConstants.PROPERTY_BULK_OUTPUT_MODE_KEY + "'='true'," +
+ "'hbase.columns.mapping'=':key," + familyName + ":english," + familyName + ":spanish')";
+
+ assertEquals(0, hcatDriver.run(dbquery).getResponseCode());
+ assertEquals(0, hcatDriver.run(tableQuery).getResponseCode());
+
+ String data[] = {"1,english:ONE,spanish:UNO",
+ "2,english:TWO,spanish:DOS",
+ "3,english:THREE,spanish:TRES"};
+
+ // input/output settings
+ Path inputPath = new Path(methodTestDir, "mr_input");
+ getFileSystem().mkdirs(inputPath);
+ FSDataOutputStream os = getFileSystem().create(new Path(inputPath, "inputFile.txt"));
+ for (String line : data)
+ os.write(Bytes.toBytes(line + "\n"));
+ os.close();
+
+ //create job
+ Job job = new Job(conf, testName);
+ job.setWorkingDirectory(new Path(methodTestDir, "mr_work"));
+ job.setJarByClass(this.getClass());
+ job.setMapperClass(MapHCatWrite.class);
+
+ job.setInputFormatClass(TextInputFormat.class);
+ TextInputFormat.setInputPaths(job, inputPath);
+
+
+ job.setOutputFormatClass(HCatOutputFormat.class);
+ OutputJobInfo outputJobInfo = OutputJobInfo.create(databaseName, tableName, null);
+ HCatOutputFormat.setOutput(job, outputJobInfo);
+
+ job.setMapOutputKeyClass(BytesWritable.class);
+ job.setMapOutputValueClass(HCatRecord.class);
+
+ job.setOutputKeyClass(BytesWritable.class);
+ job.setOutputValueClass(HCatRecord.class);
+
+ job.setNumReduceTasks(0);
+
+ assertTrue(job.waitForCompletion(true));
+
+ //verify
+ HTable table = new HTable(conf, tableName);
+ Scan scan = new Scan();
+ scan.addFamily(familyNameBytes);
+ ResultScanner scanner = table.getScanner(scan);
+ int index = 0;
+ for (Result result : scanner) {
+ String vals[] = data[index].toString().split(",");
+ for (int i = 1; i < vals.length; i++) {
+ String pair[] = vals[i].split(":");
+ assertTrue(result.containsColumn(familyNameBytes, Bytes.toBytes(pair[0])));
+ assertEquals(pair[1], Bytes.toString(result.getValue(familyNameBytes, Bytes.toBytes(pair[0]))));
+ }
+ index++;
+ }
+ //test if load count is the same
+ assertEquals(data.length, index);
+ }
+
+ @Test
+ public void bulkModeAbortTest() throws Exception {
+ String testName = "bulkModeAbortTest";
+ Path methodTestDir = new Path(getTestDir(), testName);
+ String databaseName = testName.toLowerCase();
+ String dbDir = new Path(methodTestDir, "DB_" + testName).toString();
+ String tableName = newTableName(testName).toLowerCase();
+ String familyName = "my_family";
+
+ // include hbase config in conf file
+ Configuration conf = new Configuration(allConf);
+ conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(allConf.getAllProperties()));
+
+ String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '" + dbDir
+ + "'";
+ String tableQuery = "CREATE TABLE " + databaseName + "." + tableName +
+ "(key int, english string, spanish string) STORED BY " +
+ "'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" +
+ "TBLPROPERTIES ('" + HBaseConstants.PROPERTY_BULK_OUTPUT_MODE_KEY + "'='true'," +
+ "'hbase.columns.mapping'=':key," + familyName + ":english," + familyName
+ + ":spanish')";
+
+ assertEquals(0, hcatDriver.run(dbquery).getResponseCode());
+ assertEquals(0, hcatDriver.run(tableQuery).getResponseCode());
+
+ String data[] = {"1,english:ONE,spanish:UNO",
+ "2,english:TWO,spanish:DOS",
+ "3,english:THREE,spanish:TRES"};
+
+ Path inputPath = new Path(methodTestDir, "mr_input");
+ getFileSystem().mkdirs(inputPath);
+ // create multiple files so we can test with multiple mappers
+ for (int i = 0; i < data.length; i++) {
+ FSDataOutputStream os = getFileSystem().create(
+ new Path(inputPath, "inputFile" + i + ".txt"));
+ os.write(Bytes.toBytes(data[i] + "\n"));
+ os.close();
+ }
+
+ Path workingDir = new Path(methodTestDir, "mr_abort");
+ OutputJobInfo outputJobInfo = OutputJobInfo.create(databaseName,
+ tableName, null);
+ Job job = configureJob(testName,
+ conf, workingDir, MapWriteAbortTransaction.class,
+ outputJobInfo, inputPath);
+ assertFalse(job.waitForCompletion(true));
+
+ // verify that revision manager has it as aborted transaction
+ RevisionManager rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf);
+ try {
+ TableSnapshot snapshot = rm.createSnapshot(databaseName + "." + tableName);
+ for (String family : snapshot.getColumnFamilies()) {
+ assertEquals(1, snapshot.getRevision(family));
+ List<FamilyRevision> abortedWriteTransactions = rm.getAbortedWriteTransactions(
+ databaseName + "." + tableName, family);
+ assertEquals(1, abortedWriteTransactions.size());
+ assertEquals(1, abortedWriteTransactions.get(0).getRevision());
+ }
+ } finally {
+ rm.close();
+ }
+
+ //verify that hbase does not have any of the records.
+ //Since records are only written during commitJob,
+ //hbase should not have any records.
+ HTable table = new HTable(conf, databaseName + "." + tableName);
+ Scan scan = new Scan();
+ scan.addFamily(Bytes.toBytes(familyName));
+ ResultScanner scanner = table.getScanner(scan);
+ assertFalse(scanner.iterator().hasNext());
+
+ // verify that the storage handler input format returns empty results.
+ Path outputDir = new Path(getTestDir(),
+ "mapred/testHBaseTableBulkIgnoreAbortedTransactions");
+ FileSystem fs = getFileSystem();
+ if (fs.exists(outputDir)) {
+ fs.delete(outputDir, true);
}
+ job = new Job(conf, "hbase-bulk-aborted-transaction");
+ job.setJarByClass(this.getClass());
+ job.setMapperClass(MapReadAbortedTransaction.class);
+ job.setInputFormatClass(HCatInputFormat.class);
+ HCatInputFormat.setInput(job, databaseName, tableName);
+ job.setOutputFormatClass(TextOutputFormat.class);
+ TextOutputFormat.setOutputPath(job, outputDir);
+ job.setMapOutputKeyClass(BytesWritable.class);
+ job.setMapOutputValueClass(Text.class);
+ job.setOutputKeyClass(BytesWritable.class);
+ job.setOutputValueClass(Text.class);
+ job.setNumReduceTasks(0);
+ assertTrue(job.waitForCompletion(true));
+ }
+
+ private Job configureJob(String jobName, Configuration conf,
+ Path workingDir, Class<? extends Mapper> mapperClass,
+ OutputJobInfo outputJobInfo, Path inputPath) throws IOException {
+ Job job = new Job(conf, jobName);
+ job.setWorkingDirectory(workingDir);
+ job.setJarByClass(this.getClass());
+ job.setMapperClass(mapperClass);
+
+ job.setInputFormatClass(TextInputFormat.class);
+ TextInputFormat.setInputPaths(job, inputPath);
+ job.setOutputFormatClass(HCatOutputFormat.class);
+ HCatOutputFormat.setOutput(job, outputJobInfo);
+
+ job.setMapOutputKeyClass(BytesWritable.class);
+ job.setMapOutputValueClass(HCatRecord.class);
+ job.setOutputKeyClass(BytesWritable.class);
+ job.setOutputValueClass(HCatRecord.class);
+
+ job.setNumReduceTasks(0);
+ return job;
+ }
}
Modified: hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java (original)
+++ hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java Thu Sep 12 01:21:10 2013
@@ -48,21 +48,21 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hcatalog.cli.HCatDriver;
-import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
-import org.apache.hcatalog.common.HCatConstants;
-import org.apache.hcatalog.common.HCatUtil;
-import org.apache.hcatalog.data.DefaultHCatRecord;
-import org.apache.hcatalog.data.HCatRecord;
-import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hive.hcatalog.cli.HCatDriver;
+import org.apache.hive.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
+import org.apache.hive.hcatalog.common.HCatConstants;
+import org.apache.hive.hcatalog.common.HCatUtil;
+import org.apache.hive.hcatalog.data.DefaultHCatRecord;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.hive.hcatalog.data.schema.HCatSchema;
import org.apache.hcatalog.hbase.snapshot.FamilyRevision;
import org.apache.hcatalog.hbase.snapshot.RevisionManager;
import org.apache.hcatalog.hbase.snapshot.RevisionManagerConfiguration;
import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
import org.apache.hcatalog.hbase.snapshot.Transaction;
-import org.apache.hcatalog.mapreduce.HCatInputFormat;
-import org.apache.hcatalog.mapreduce.HCatOutputFormat;
-import org.apache.hcatalog.mapreduce.OutputJobInfo;
+import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.hive.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hive.hcatalog.mapreduce.OutputJobInfo;
import org.junit.Test;
import java.io.IOException;
@@ -80,422 +80,422 @@ import static org.junit.Assert.assertTru
*/
public class TestHBaseDirectOutputFormat extends SkeletonHBaseTest {
- private final HiveConf allConf;
- private final HCatDriver hcatDriver;
+ private final HiveConf allConf;
+ private final HCatDriver hcatDriver;
- public TestHBaseDirectOutputFormat() {
- allConf = getHiveConf();
- allConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
- HCatSemanticAnalyzer.class.getName());
- allConf.set(HiveConf.ConfVars.HADOOPFS.varname, getFileSystem().getUri().toString());
- allConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, new Path(getTestDir(), "warehouse").toString());
-
- //Add hbase properties
- for (Map.Entry<String, String> el : getHbaseConf())
- allConf.set(el.getKey(), el.getValue());
- for (Map.Entry<String, String> el : getJobConf())
- allConf.set(el.getKey(), el.getValue());
- HBaseConfiguration.merge(
- allConf,
- RevisionManagerConfiguration.create());
- SessionState.start(new CliSessionState(allConf));
- hcatDriver = new HCatDriver();
- }
-
- @Test
- public void directOutputFormatTest() throws IOException, ClassNotFoundException, InterruptedException {
- String testName = "directOutputFormatTest";
- Path methodTestDir = new Path(getTestDir(), testName);
-
- String tableName = newTableName(testName).toLowerCase();
- String familyName = "my_family";
- byte[] familyNameBytes = Bytes.toBytes(familyName);
-
- //include hbase config in conf file
- Configuration conf = new Configuration(allConf);
- conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(allConf.getAllProperties()));
-
- //create table
- createTable(tableName, new String[]{familyName});
-
- String data[] = {"1,english:ONE,spanish:UNO",
- "2,english:ONE,spanish:DOS",
- "3,english:ONE,spanish:TRES"};
-
-
- // input/output settings
- Path inputPath = new Path(methodTestDir, "mr_input");
- getFileSystem().mkdirs(inputPath);
- FSDataOutputStream os = getFileSystem().create(new Path(inputPath, "inputFile.txt"));
- for (String line : data)
- os.write(Bytes.toBytes(line + "\n"));
- os.close();
-
- //create job
- JobConf job = new JobConf(conf);
- job.setJobName(testName);
- job.setWorkingDirectory(new Path(methodTestDir, "mr_work"));
- job.setJarByClass(this.getClass());
- job.setMapperClass(MapWrite.class);
-
- job.setInputFormat(org.apache.hadoop.mapred.TextInputFormat.class);
- org.apache.hadoop.mapred.TextInputFormat.setInputPaths(job, inputPath);
-
- job.setOutputFormat(HBaseDirectOutputFormat.class);
- job.set(TableOutputFormat.OUTPUT_TABLE, tableName);
- job.set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, tableName);
-
- //manually create transaction
- RevisionManager rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf);
- try {
- OutputJobInfo outputJobInfo = OutputJobInfo.create("default", tableName, null);
- Transaction txn = rm.beginWriteTransaction(tableName, Arrays.asList(familyName));
- outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY,
- HCatUtil.serialize(txn));
- job.set(HCatConstants.HCAT_KEY_OUTPUT_INFO,
- HCatUtil.serialize(outputJobInfo));
- } finally {
- rm.close();
- }
-
- job.setMapOutputKeyClass(BytesWritable.class);
- job.setMapOutputValueClass(HCatRecord.class);
- job.setOutputKeyClass(BytesWritable.class);
- job.setOutputValueClass(HCatRecord.class);
- job.setNumReduceTasks(0);
-
- RunningJob runJob = JobClient.runJob(job);
- runJob.waitForCompletion();
- assertTrue(runJob.isSuccessful());
-
- //verify
- HTable table = new HTable(conf, tableName);
- Scan scan = new Scan();
- scan.addFamily(familyNameBytes);
- ResultScanner scanner = table.getScanner(scan);
- int index = 0;
- for (Result result : scanner) {
- String vals[] = data[index].toString().split(",");
- for (int i = 1; i < vals.length; i++) {
- String pair[] = vals[i].split(":");
- assertTrue(result.containsColumn(familyNameBytes, Bytes.toBytes(pair[0])));
- assertEquals(pair[1], Bytes.toString(result.getValue(familyNameBytes, Bytes.toBytes(pair[0]))));
- }
- index++;
- }
- assertEquals(data.length, index);
+ public TestHBaseDirectOutputFormat() {
+ allConf = getHiveConf();
+ allConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
+ HCatSemanticAnalyzer.class.getName());
+ allConf.set(HiveConf.ConfVars.HADOOPFS.varname, getFileSystem().getUri().toString());
+ allConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, new Path(getTestDir(), "warehouse").toString());
+
+ //Add hbase properties
+ for (Map.Entry<String, String> el : getHbaseConf())
+ allConf.set(el.getKey(), el.getValue());
+ for (Map.Entry<String, String> el : getJobConf())
+ allConf.set(el.getKey(), el.getValue());
+ HBaseConfiguration.merge(
+ allConf,
+ RevisionManagerConfiguration.create());
+ SessionState.start(new CliSessionState(allConf));
+ hcatDriver = new HCatDriver();
+ }
+
+ @Test
+ public void directOutputFormatTest() throws IOException, ClassNotFoundException, InterruptedException {
+ String testName = "directOutputFormatTest";
+ Path methodTestDir = new Path(getTestDir(), testName);
+
+ String tableName = newTableName(testName).toLowerCase();
+ String familyName = "my_family";
+ byte[] familyNameBytes = Bytes.toBytes(familyName);
+
+ //include hbase config in conf file
+ Configuration conf = new Configuration(allConf);
+ conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(allConf.getAllProperties()));
+
+ //create table
+ createTable(tableName, new String[]{familyName});
+
+ String data[] = {"1,english:ONE,spanish:UNO",
+ "2,english:ONE,spanish:DOS",
+ "3,english:ONE,spanish:TRES"};
+
+
+ // input/output settings
+ Path inputPath = new Path(methodTestDir, "mr_input");
+ getFileSystem().mkdirs(inputPath);
+ FSDataOutputStream os = getFileSystem().create(new Path(inputPath, "inputFile.txt"));
+ for (String line : data)
+ os.write(Bytes.toBytes(line + "\n"));
+ os.close();
+
+ //create job
+ JobConf job = new JobConf(conf);
+ job.setJobName(testName);
+ job.setWorkingDirectory(new Path(methodTestDir, "mr_work"));
+ job.setJarByClass(this.getClass());
+ job.setMapperClass(MapWrite.class);
+
+ job.setInputFormat(org.apache.hadoop.mapred.TextInputFormat.class);
+ org.apache.hadoop.mapred.TextInputFormat.setInputPaths(job, inputPath);
+
+ job.setOutputFormat(HBaseDirectOutputFormat.class);
+ job.set(TableOutputFormat.OUTPUT_TABLE, tableName);
+ job.set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, tableName);
+
+ //manually create transaction
+ RevisionManager rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf);
+ try {
+ OutputJobInfo outputJobInfo = OutputJobInfo.create("default", tableName, null);
+ Transaction txn = rm.beginWriteTransaction(tableName, Arrays.asList(familyName));
+ outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY,
+ HCatUtil.serialize(txn));
+ job.set(HCatConstants.HCAT_KEY_OUTPUT_INFO,
+ HCatUtil.serialize(outputJobInfo));
+ } finally {
+ rm.close();
}
- @Test
- public void directHCatOutputFormatTest() throws Exception {
- String testName = "directHCatOutputFormatTest";
- Path methodTestDir = new Path(getTestDir(), testName);
-
- String databaseName = testName;
- String dbDir = new Path(methodTestDir, "DB_" + testName).toString();
- String tableName = newTableName(testName);
- String familyName = "my_family";
- byte[] familyNameBytes = Bytes.toBytes(familyName);
- //Table name will be lower case unless specified by hbase.table.name property
- String hbaseTableName = (databaseName + "." + tableName).toLowerCase();
-
- //include hbase config in conf file
- Configuration conf = new Configuration(allConf);
- conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(allConf.getAllProperties()));
-
-
- String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '" + dbDir + "'";
- String tableQuery = "CREATE TABLE " + databaseName + "." + tableName +
- "(key int, english string, spanish string) STORED BY " +
- "'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" +
- "TBLPROPERTIES (" +
- "'hbase.columns.mapping'=':key," + familyName + ":english," + familyName + ":spanish')";
-
- assertEquals(0, hcatDriver.run(dbquery).getResponseCode());
- assertEquals(0, hcatDriver.run(tableQuery).getResponseCode());
-
- String data[] = {"1,english:ONE,spanish:UNO",
- "2,english:ONE,spanish:DOS",
- "3,english:ONE,spanish:TRES"};
-
- // input/output settings
- Path inputPath = new Path(methodTestDir, "mr_input");
- getFileSystem().mkdirs(inputPath);
- //create multiple files so we can test with multiple mappers
- for (int i = 0; i < data.length; i++) {
- FSDataOutputStream os = getFileSystem().create(new Path(inputPath, "inputFile" + i + ".txt"));
- os.write(Bytes.toBytes(data[i] + "\n"));
- os.close();
- }
+ job.setMapOutputKeyClass(BytesWritable.class);
+ job.setMapOutputValueClass(HCatRecord.class);
+ job.setOutputKeyClass(BytesWritable.class);
+ job.setOutputValueClass(HCatRecord.class);
+ job.setNumReduceTasks(0);
+
+ RunningJob runJob = JobClient.runJob(job);
+ runJob.waitForCompletion();
+ assertTrue(runJob.isSuccessful());
+
+ //verify
+ HTable table = new HTable(conf, tableName);
+ Scan scan = new Scan();
+ scan.addFamily(familyNameBytes);
+ ResultScanner scanner = table.getScanner(scan);
+ int index = 0;
+ for (Result result : scanner) {
+ String vals[] = data[index].toString().split(",");
+ for (int i = 1; i < vals.length; i++) {
+ String pair[] = vals[i].split(":");
+ assertTrue(result.containsColumn(familyNameBytes, Bytes.toBytes(pair[0])));
+ assertEquals(pair[1], Bytes.toString(result.getValue(familyNameBytes, Bytes.toBytes(pair[0]))));
+ }
+ index++;
+ }
+ assertEquals(data.length, index);
+ }
- //create job
- Path workingDir = new Path(methodTestDir, "mr_work");
- OutputJobInfo outputJobInfo = OutputJobInfo.create(databaseName,
- tableName, null);
- Job job = configureJob(testName, conf, workingDir, MapHCatWrite.class,
- outputJobInfo, inputPath);
- assertTrue(job.waitForCompletion(true));
-
- RevisionManager rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf);
- try {
- TableSnapshot snapshot = rm.createSnapshot(hbaseTableName);
- for (String el : snapshot.getColumnFamilies()) {
- assertEquals(1, snapshot.getRevision(el));
- }
- } finally {
- rm.close();
- }
+ @Test
+ public void directHCatOutputFormatTest() throws Exception {
+ String testName = "directHCatOutputFormatTest";
+ Path methodTestDir = new Path(getTestDir(), testName);
+
+ String databaseName = testName;
+ String dbDir = new Path(methodTestDir, "DB_" + testName).toString();
+ String tableName = newTableName(testName);
+ String familyName = "my_family";
+ byte[] familyNameBytes = Bytes.toBytes(familyName);
+ //Table name will be lower case unless specified by hbase.table.name property
+ String hbaseTableName = (databaseName + "." + tableName).toLowerCase();
+
+ //include hbase config in conf file
+ Configuration conf = new Configuration(allConf);
+ conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(allConf.getAllProperties()));
+
+
+ String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '" + dbDir + "'";
+ String tableQuery = "CREATE TABLE " + databaseName + "." + tableName +
+ "(key int, english string, spanish string) STORED BY " +
+ "'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" +
+ "TBLPROPERTIES (" +
+ "'hbase.columns.mapping'=':key," + familyName + ":english," + familyName + ":spanish')";
+
+ assertEquals(0, hcatDriver.run(dbquery).getResponseCode());
+ assertEquals(0, hcatDriver.run(tableQuery).getResponseCode());
+
+ String data[] = {"1,english:ONE,spanish:UNO",
+ "2,english:ONE,spanish:DOS",
+ "3,english:ONE,spanish:TRES"};
+
+ // input/output settings
+ Path inputPath = new Path(methodTestDir, "mr_input");
+ getFileSystem().mkdirs(inputPath);
+ //create multiple files so we can test with multiple mappers
+ for (int i = 0; i < data.length; i++) {
+ FSDataOutputStream os = getFileSystem().create(new Path(inputPath, "inputFile" + i + ".txt"));
+ os.write(Bytes.toBytes(data[i] + "\n"));
+ os.close();
+ }
- //verify
- HTable table = new HTable(conf, hbaseTableName);
- Scan scan = new Scan();
- scan.addFamily(familyNameBytes);
- ResultScanner scanner = table.getScanner(scan);
- int index = 0;
- for (Result result : scanner) {
- String vals[] = data[index].toString().split(",");
- for (int i = 1; i < vals.length; i++) {
- String pair[] = vals[i].split(":");
- assertTrue(result.containsColumn(familyNameBytes, Bytes.toBytes(pair[0])));
- assertEquals(pair[1], Bytes.toString(result.getValue(familyNameBytes, Bytes.toBytes(pair[0]))));
- assertEquals(1l, result.getColumn(familyNameBytes, Bytes.toBytes(pair[0])).get(0).getTimestamp());
- }
- index++;
- }
- assertEquals(data.length, index);
+ //create job
+ Path workingDir = new Path(methodTestDir, "mr_work");
+ OutputJobInfo outputJobInfo = OutputJobInfo.create(databaseName,
+ tableName, null);
+ Job job = configureJob(testName, conf, workingDir, MapHCatWrite.class,
+ outputJobInfo, inputPath);
+ assertTrue(job.waitForCompletion(true));
+
+ RevisionManager rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf);
+ try {
+ TableSnapshot snapshot = rm.createSnapshot(hbaseTableName);
+ for (String el : snapshot.getColumnFamilies()) {
+ assertEquals(1, snapshot.getRevision(el));
+ }
+ } finally {
+ rm.close();
}
- @Test
- public void directModeAbortTest() throws Exception {
- String testName = "directModeAbortTest";
- Path methodTestDir = new Path(getTestDir(), testName);
- String databaseName = testName;
- String dbDir = new Path(methodTestDir, "DB_" + testName).toString();
- String tableName = newTableName(testName);
- String familyName = "my_family";
- byte[] familyNameBytes = Bytes.toBytes(familyName);
- //Table name as specified by hbase.table.name property
- String hbaseTableName = tableName;
-
- // include hbase config in conf file
- Configuration conf = new Configuration(allConf);
- conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(allConf.getAllProperties()));
-
- String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '" + dbDir
- + "'";
- String tableQuery = "CREATE TABLE " + databaseName + "." + tableName +
- "(key int, english string, spanish string) STORED BY " +
- "'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" +
- "TBLPROPERTIES (" +
- "'hbase.columns.mapping'=':key," + familyName + ":english," + familyName +
- ":spanish','hbase.table.name'='" + hbaseTableName + "')";
-
- assertEquals(0, hcatDriver.run(dbquery).getResponseCode());
- assertEquals(0, hcatDriver.run(tableQuery).getResponseCode());
-
- String data[] = {"1,english:ONE,spanish:UNO",
- "2,english:TWO,spanish:DOS",
- "3,english:THREE,spanish:TRES"};
-
- Path inputPath = new Path(methodTestDir, "mr_input");
- getFileSystem().mkdirs(inputPath);
- // create multiple files so we can test with multiple mappers
- for (int i = 0; i < data.length; i++) {
- FSDataOutputStream os = getFileSystem().create(
- new Path(inputPath, "inputFile" + i + ".txt"));
- os.write(Bytes.toBytes(data[i] + "\n"));
- os.close();
- }
+ //verify
+ HTable table = new HTable(conf, hbaseTableName);
+ Scan scan = new Scan();
+ scan.addFamily(familyNameBytes);
+ ResultScanner scanner = table.getScanner(scan);
+ int index = 0;
+ for (Result result : scanner) {
+ String vals[] = data[index].toString().split(",");
+ for (int i = 1; i < vals.length; i++) {
+ String pair[] = vals[i].split(":");
+ assertTrue(result.containsColumn(familyNameBytes, Bytes.toBytes(pair[0])));
+ assertEquals(pair[1], Bytes.toString(result.getValue(familyNameBytes, Bytes.toBytes(pair[0]))));
+ assertEquals(1l, result.getColumn(familyNameBytes, Bytes.toBytes(pair[0])).get(0).getTimestamp());
+ }
+ index++;
+ }
+ assertEquals(data.length, index);
+ }
- Path workingDir = new Path(methodTestDir, "mr_abort");
- OutputJobInfo outputJobInfo = OutputJobInfo.create(databaseName,
- tableName, null);
- Job job = configureJob(testName, conf, workingDir, MapWriteAbortTransaction.class,
- outputJobInfo, inputPath);
- assertFalse(job.waitForCompletion(true));
-
- // verify that revision manager has it as aborted transaction
- RevisionManager rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf);
- try {
- TableSnapshot snapshot = rm.createSnapshot(hbaseTableName);
- for (String family : snapshot.getColumnFamilies()) {
- assertEquals(1, snapshot.getRevision(family));
- List<FamilyRevision> abortedWriteTransactions = rm.getAbortedWriteTransactions(
- hbaseTableName, family);
- assertEquals(1, abortedWriteTransactions.size());
- assertEquals(1, abortedWriteTransactions.get(0).getRevision());
- }
- } finally {
- rm.close();
- }
+ @Test
+ public void directModeAbortTest() throws Exception {
+ String testName = "directModeAbortTest";
+ Path methodTestDir = new Path(getTestDir(), testName);
+ String databaseName = testName;
+ String dbDir = new Path(methodTestDir, "DB_" + testName).toString();
+ String tableName = newTableName(testName);
+ String familyName = "my_family";
+ byte[] familyNameBytes = Bytes.toBytes(familyName);
+ //Table name as specified by hbase.table.name property
+ String hbaseTableName = tableName;
+
+ // include hbase config in conf file
+ Configuration conf = new Configuration(allConf);
+ conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(allConf.getAllProperties()));
+
+ String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '" + dbDir
+ + "'";
+ String tableQuery = "CREATE TABLE " + databaseName + "." + tableName +
+ "(key int, english string, spanish string) STORED BY " +
+ "'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" +
+ "TBLPROPERTIES (" +
+ "'hbase.columns.mapping'=':key," + familyName + ":english," + familyName +
+ ":spanish','hbase.table.name'='" + hbaseTableName + "')";
+
+ assertEquals(0, hcatDriver.run(dbquery).getResponseCode());
+ assertEquals(0, hcatDriver.run(tableQuery).getResponseCode());
+
+ String data[] = {"1,english:ONE,spanish:UNO",
+ "2,english:TWO,spanish:DOS",
+ "3,english:THREE,spanish:TRES"};
+
+ Path inputPath = new Path(methodTestDir, "mr_input");
+ getFileSystem().mkdirs(inputPath);
+ // create multiple files so we can test with multiple mappers
+ for (int i = 0; i < data.length; i++) {
+ FSDataOutputStream os = getFileSystem().create(
+ new Path(inputPath, "inputFile" + i + ".txt"));
+ os.write(Bytes.toBytes(data[i] + "\n"));
+ os.close();
+ }
- // verify that hbase has the records of the successful maps.
- HTable table = new HTable(conf, hbaseTableName);
- Scan scan = new Scan();
- scan.addFamily(familyNameBytes);
- ResultScanner scanner = table.getScanner(scan);
- int count = 0;
- for (Result result : scanner) {
- String key = Bytes.toString(result.getRow());
- assertNotSame(MapWriteAbortTransaction.failedKey, key);
- int index = Integer.parseInt(key) - 1;
- String vals[] = data[index].toString().split(",");
- for (int i = 1; i < vals.length; i++) {
- String pair[] = vals[i].split(":");
- assertTrue(result.containsColumn(familyNameBytes, Bytes.toBytes(pair[0])));
- assertEquals(pair[1],
- Bytes.toString(result.getValue(familyNameBytes, Bytes.toBytes(pair[0]))));
- assertEquals(1l, result.getColumn(familyNameBytes, Bytes.toBytes(pair[0])).get(0)
- .getTimestamp());
- }
- count++;
- }
- assertEquals(data.length - 1, count);
+ Path workingDir = new Path(methodTestDir, "mr_abort");
+ OutputJobInfo outputJobInfo = OutputJobInfo.create(databaseName,
+ tableName, null);
+ Job job = configureJob(testName, conf, workingDir, MapWriteAbortTransaction.class,
+ outputJobInfo, inputPath);
+ assertFalse(job.waitForCompletion(true));
+
+ // verify that revision manager has it as aborted transaction
+ RevisionManager rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf);
+ try {
+ TableSnapshot snapshot = rm.createSnapshot(hbaseTableName);
+ for (String family : snapshot.getColumnFamilies()) {
+ assertEquals(1, snapshot.getRevision(family));
+ List<FamilyRevision> abortedWriteTransactions = rm.getAbortedWriteTransactions(
+ hbaseTableName, family);
+ assertEquals(1, abortedWriteTransactions.size());
+ assertEquals(1, abortedWriteTransactions.get(0).getRevision());
+ }
+ } finally {
+ rm.close();
+ }
- // verify that the inputformat returns empty results.
- Path outputDir = new Path(getTestDir(),
- "mapred/testHBaseTableIgnoreAbortedTransactions");
- FileSystem fs = getFileSystem();
- if (fs.exists(outputDir)) {
- fs.delete(outputDir, true);
- }
- job = new Job(conf, "hbase-aborted-transaction");
- job.setJarByClass(this.getClass());
- job.setMapperClass(MapReadAbortedTransaction.class);
- job.setInputFormatClass(HCatInputFormat.class);
- HCatInputFormat.setInput(job, databaseName, tableName);
- job.setOutputFormatClass(TextOutputFormat.class);
- TextOutputFormat.setOutputPath(job, outputDir);
- job.setMapOutputKeyClass(BytesWritable.class);
- job.setMapOutputValueClass(Text.class);
- job.setOutputKeyClass(BytesWritable.class);
- job.setOutputValueClass(Text.class);
- job.setNumReduceTasks(0);
- assertTrue(job.waitForCompletion(true));
- }
-
- private Job configureJob(String jobName, Configuration conf,
- Path workingDir, Class<? extends Mapper> mapperClass,
- OutputJobInfo outputJobInfo, Path inputPath) throws IOException {
- Job job = new Job(conf, jobName);
- job.setWorkingDirectory(workingDir);
- job.setJarByClass(this.getClass());
- job.setMapperClass(mapperClass);
-
- job.setInputFormatClass(TextInputFormat.class);
- TextInputFormat.setInputPaths(job, inputPath);
- job.setOutputFormatClass(HCatOutputFormat.class);
- HCatOutputFormat.setOutput(job, outputJobInfo);
- String txnString = job.getConfiguration().get(HBaseConstants.PROPERTY_WRITE_TXN_KEY);
- //Test passing in same OutputJobInfo multiple times and verify 1 transaction is created
- String jobString = job.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
- outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(jobString);
- Job job2 = new Job(conf);
- HCatOutputFormat.setOutput(job2, outputJobInfo);
- assertEquals(txnString, job2.getConfiguration().get(HBaseConstants.PROPERTY_WRITE_TXN_KEY));
- job.setMapOutputKeyClass(BytesWritable.class);
- job.setMapOutputValueClass(HCatRecord.class);
- job.setOutputKeyClass(BytesWritable.class);
- job.setOutputValueClass(HCatRecord.class);
-
- job.setNumReduceTasks(0);
- return job;
- }
-
- public static class MapHCatWrite extends Mapper<LongWritable, Text, BytesWritable, HCatRecord> {
-
- @Override
- public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
- OutputJobInfo jobInfo = (OutputJobInfo) HCatUtil.deserialize(context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
- HCatRecord record = new DefaultHCatRecord(3);
- HCatSchema schema = jobInfo.getOutputSchema();
- String vals[] = value.toString().split(",");
- record.setInteger("key", schema, Integer.parseInt(vals[0]));
- for (int i = 1; i < vals.length; i++) {
- String pair[] = vals[i].split(":");
- record.set(pair[0], schema, pair[1]);
- }
- context.write(null, record);
- }
+ // verify that hbase has the records of the successful maps.
+ HTable table = new HTable(conf, hbaseTableName);
+ Scan scan = new Scan();
+ scan.addFamily(familyNameBytes);
+ ResultScanner scanner = table.getScanner(scan);
+ int count = 0;
+ for (Result result : scanner) {
+ String key = Bytes.toString(result.getRow());
+ assertNotSame(MapWriteAbortTransaction.failedKey, key);
+ int index = Integer.parseInt(key) - 1;
+ String vals[] = data[index].toString().split(",");
+ for (int i = 1; i < vals.length; i++) {
+ String pair[] = vals[i].split(":");
+ assertTrue(result.containsColumn(familyNameBytes, Bytes.toBytes(pair[0])));
+ assertEquals(pair[1],
+ Bytes.toString(result.getValue(familyNameBytes, Bytes.toBytes(pair[0]))));
+ assertEquals(1l, result.getColumn(familyNameBytes, Bytes.toBytes(pair[0])).get(0)
+ .getTimestamp());
+ }
+ count++;
}
+ assertEquals(data.length - 1, count);
- public static class MapWrite implements org.apache.hadoop.mapred.Mapper<LongWritable, Text, BytesWritable, Put> {
+ // verify that the inputformat returns empty results.
+ Path outputDir = new Path(getTestDir(),
+ "mapred/testHBaseTableIgnoreAbortedTransactions");
+ FileSystem fs = getFileSystem();
+ if (fs.exists(outputDir)) {
+ fs.delete(outputDir, true);
+ }
+ job = new Job(conf, "hbase-aborted-transaction");
+ job.setJarByClass(this.getClass());
+ job.setMapperClass(MapReadAbortedTransaction.class);
+ job.setInputFormatClass(HCatInputFormat.class);
+ HCatInputFormat.setInput(job, databaseName, tableName);
+ job.setOutputFormatClass(TextOutputFormat.class);
+ TextOutputFormat.setOutputPath(job, outputDir);
+ job.setMapOutputKeyClass(BytesWritable.class);
+ job.setMapOutputValueClass(Text.class);
+ job.setOutputKeyClass(BytesWritable.class);
+ job.setOutputValueClass(Text.class);
+ job.setNumReduceTasks(0);
+ assertTrue(job.waitForCompletion(true));
+ }
+
+ private Job configureJob(String jobName, Configuration conf,
+ Path workingDir, Class<? extends Mapper> mapperClass,
+ OutputJobInfo outputJobInfo, Path inputPath) throws IOException {
+ Job job = new Job(conf, jobName);
+ job.setWorkingDirectory(workingDir);
+ job.setJarByClass(this.getClass());
+ job.setMapperClass(mapperClass);
+
+ job.setInputFormatClass(TextInputFormat.class);
+ TextInputFormat.setInputPaths(job, inputPath);
+ job.setOutputFormatClass(HCatOutputFormat.class);
+ HCatOutputFormat.setOutput(job, outputJobInfo);
+ String txnString = job.getConfiguration().get(HBaseConstants.PROPERTY_WRITE_TXN_KEY);
+ //Test passing in same OutputJobInfo multiple times and verify 1 transaction is created
+ String jobString = job.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
+ outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(jobString);
+ Job job2 = new Job(conf);
+ HCatOutputFormat.setOutput(job2, outputJobInfo);
+ assertEquals(txnString, job2.getConfiguration().get(HBaseConstants.PROPERTY_WRITE_TXN_KEY));
+ job.setMapOutputKeyClass(BytesWritable.class);
+ job.setMapOutputValueClass(HCatRecord.class);
+ job.setOutputKeyClass(BytesWritable.class);
+ job.setOutputValueClass(HCatRecord.class);
+
+ job.setNumReduceTasks(0);
+ return job;
+ }
+
+ public static class MapHCatWrite extends Mapper<LongWritable, Text, BytesWritable, HCatRecord> {
+
+ @Override
+ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+ OutputJobInfo jobInfo = (OutputJobInfo) HCatUtil.deserialize(context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
+ HCatRecord record = new DefaultHCatRecord(3);
+ HCatSchema schema = jobInfo.getOutputSchema();
+ String vals[] = value.toString().split(",");
+ record.setInteger("key", schema, Integer.parseInt(vals[0]));
+ for (int i = 1; i < vals.length; i++) {
+ String pair[] = vals[i].split(":");
+ record.set(pair[0], schema, pair[1]);
+ }
+ context.write(null, record);
+ }
+ }
- @Override
- public void configure(JobConf job) {
- }
+ public static class MapWrite implements org.apache.hadoop.mapred.Mapper<LongWritable, Text, BytesWritable, Put> {
- @Override
- public void close() throws IOException {
- }
+ @Override
+ public void configure(JobConf job) {
+ }
- @Override
- public void map(LongWritable key, Text value,
- OutputCollector<BytesWritable, Put> output, Reporter reporter)
- throws IOException {
- String vals[] = value.toString().split(",");
- Put put = new Put(Bytes.toBytes(vals[0]));
- for (int i = 1; i < vals.length; i++) {
- String pair[] = vals[i].split(":");
- put.add(Bytes.toBytes("my_family"),
- Bytes.toBytes(pair[0]),
- Bytes.toBytes(pair[1]));
- }
- output.collect(null, put);
- }
+ @Override
+ public void close() throws IOException {
}
- static class MapWriteAbortTransaction extends Mapper<LongWritable, Text, BytesWritable, HCatRecord> {
- public static String failedKey;
- private static int count = 0;
-
- @Override
- public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
- OutputJobInfo jobInfo = (OutputJobInfo) HCatUtil.deserialize(context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
- HCatRecord record = new DefaultHCatRecord(3);
- HCatSchema schema = jobInfo.getOutputSchema();
- String vals[] = value.toString().split(",");
- record.setInteger("key", schema, Integer.parseInt(vals[0]));
- synchronized (MapWriteAbortTransaction.class) {
- if (count == 2) {
- failedKey = vals[0];
- throw new IOException("Failing map to test abort");
- }
- for (int i = 1; i < vals.length; i++) {
- String pair[] = vals[i].split(":");
- record.set(pair[0], schema, pair[1]);
- }
- context.write(null, record);
- count++;
- }
+ @Override
+ public void map(LongWritable key, Text value,
+ OutputCollector<BytesWritable, Put> output, Reporter reporter)
+ throws IOException {
+ String vals[] = value.toString().split(",");
+ Put put = new Put(Bytes.toBytes(vals[0]));
+ for (int i = 1; i < vals.length; i++) {
+ String pair[] = vals[i].split(":");
+ put.add(Bytes.toBytes("my_family"),
+ Bytes.toBytes(pair[0]),
+ Bytes.toBytes(pair[1]));
+ }
+ output.collect(null, put);
+ }
+ }
- }
+ static class MapWriteAbortTransaction extends Mapper<LongWritable, Text, BytesWritable, HCatRecord> {
+ public static String failedKey;
+ private static int count = 0;
+
+ @Override
+ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+ OutputJobInfo jobInfo = (OutputJobInfo) HCatUtil.deserialize(context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
+ HCatRecord record = new DefaultHCatRecord(3);
+ HCatSchema schema = jobInfo.getOutputSchema();
+ String vals[] = value.toString().split(",");
+ record.setInteger("key", schema, Integer.parseInt(vals[0]));
+ synchronized (MapWriteAbortTransaction.class) {
+ if (count == 2) {
+ failedKey = vals[0];
+ throw new IOException("Failing map to test abort");
+ }
+ for (int i = 1; i < vals.length; i++) {
+ String pair[] = vals[i].split(":");
+ record.set(pair[0], schema, pair[1]);
+ }
+ context.write(null, record);
+ count++;
+ }
}
- static class MapReadAbortedTransaction
- extends
- Mapper<ImmutableBytesWritable, HCatRecord, WritableComparable<?>, Text> {
-
- @Override
- public void run(Context context) throws IOException,
- InterruptedException {
- setup(context);
- if (context.nextKeyValue()) {
- map(context.getCurrentKey(), context.getCurrentValue(), context);
- while (context.nextKeyValue()) {
- map(context.getCurrentKey(), context.getCurrentValue(),
- context);
- }
- throw new IOException("There should have been no records");
- }
- cleanup(context);
- }
+ }
- @Override
- public void map(ImmutableBytesWritable key, HCatRecord value,
- Context context) throws IOException, InterruptedException {
- System.out.println("HCat record value" + value.toString());
- }
+ static class MapReadAbortedTransaction
+ extends
+ Mapper<ImmutableBytesWritable, HCatRecord, WritableComparable<?>, Text> {
+
+ @Override
+ public void run(Context context) throws IOException,
+ InterruptedException {
+ setup(context);
+ if (context.nextKeyValue()) {
+ map(context.getCurrentKey(), context.getCurrentValue(), context);
+ while (context.nextKeyValue()) {
+ map(context.getCurrentKey(), context.getCurrentValue(),
+ context);
+ }
+ throw new IOException("There should have been no records");
+ }
+ cleanup(context);
+ }
+
+ @Override
+ public void map(ImmutableBytesWritable key, HCatRecord value,
+ Context context) throws IOException, InterruptedException {
+ System.out.println("HCat record value" + value.toString());
}
+ }
}