You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by kh...@apache.org on 2011/12/08 22:35:29 UTC
svn commit: r1212174 - in /incubator/hcatalog/trunk: ./
storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/
storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/
Author: khorgath
Date: Thu Dec 8 22:35:28 2011
New Revision: 1212174
URL: http://svn.apache.org/viewvc?rev=1212174&view=rev
Log:
HCATALOG-171 HBaseBulkOutputStorageDriver should work with secured hadoop (toffer via khorgath)
Modified:
incubator/hcatalog/trunk/CHANGES.txt
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ImportSequenceFile.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java
Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1212174&r1=1212173&r2=1212174&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Thu Dec 8 22:35:28 2011
@@ -81,6 +81,8 @@ Trunk (unreleased changes)
OPTIMIZATIONS
BUG FIXES
+ HCAT-171. HBaseBulkOutputStorageDriver should work with secured hadoop (toffer via khorgath)
+
HCAT-170. HBaseBulkOSD fails to launch ImportSequenceFile because of missing jars in dist cache (toffer via khorgath)
HCAT-176. Class not found exception when running TestPigStorageDriver (daijy via khorgath)
Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java?rev=1212174&r1=1212173&r2=1212174&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java Thu Dec 8 22:35:28 2011
@@ -38,7 +38,6 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.UUID;
/**
* Base class share by both {@link HBaseBulkOutputStorageDriver} and {@link HBaseDirectOutputStorageDriver}
Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java?rev=1212174&r1=1212173&r2=1212174&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java Thu Dec 8 22:35:28 2011
@@ -5,7 +5,10 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.OutputCommitter;
@@ -33,6 +36,12 @@ class HBaseBulkOutputFormat extends Outp
@Override
public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
baseOutputFormat.checkOutputSpecs(context);
+ //Get jobTracker delegation token if security is enabled
+ //we need to launch the ImportSequenceFile job
+ if(context.getConfiguration().getBoolean("hadoop.security.authorization",false)) {
+ JobClient jobClient = new JobClient(new JobConf(context.getConfiguration()));
+ context.getCredentials().addToken(new Text("my mr token"), jobClient.getDelegationToken(null));
+ }
}
@Override
@@ -124,7 +133,7 @@ class HBaseBulkOutputFormat extends Outp
Configuration conf = jobContext.getConfiguration();
Path srcPath = FileOutputFormat.getOutputPath(jobContext);
Path destPath = new Path(srcPath.getParent(),srcPath.getName()+"_hfiles");
- ImportSequenceFile.runJob(conf,
+ ImportSequenceFile.runJob(jobContext,
conf.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY),
srcPath,
destPath);
Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java?rev=1212174&r1=1212173&r2=1212174&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java Thu Dec 8 22:35:28 2011
@@ -18,7 +18,10 @@
package org.apache.hcatalog.hbase;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -52,13 +55,27 @@ public class HBaseBulkOutputStorageDrive
//initialize() gets called multiple time in the lifecycle of an MR job, client, mapper, reducer, etc
//depending on the case we have to make sure for some context variables we set here that they don't get set again
if(!outputJobInfo.getProperties().containsKey(PROPERTY_INT_OUTPUT_LOCATION)) {
- String location = new Path(context.getConfiguration().get(PROPERTY_TABLE_LOCATION),
+ String tableLocation = context.getConfiguration().get(PROPERTY_TABLE_LOCATION);
+ String location = new Path(tableLocation,
"REVISION_"+outputJobInfo.getProperties()
.getProperty(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY)).toString();
outputJobInfo.getProperties().setProperty(PROPERTY_INT_OUTPUT_LOCATION, location);
//We are writing out an intermediate sequenceFile hence location is not passed in OutputJobInfo.getLocation()
//TODO replace this with a mapreduce constant when available
context.getConfiguration().set("mapred.output.dir", location);
+ //Temporary fix until support for secure hbase is available
+ //We need the intermediate directory to be world readable
+ //so that the hbase user can import the generated hfiles
+ if(context.getConfiguration().getBoolean("hadoop.security.authorization",false)) {
+ Path p = new Path(tableLocation);
+ FileSystem fs = FileSystem.get(context.getConfiguration());
+ fs.setPermission(new Path(tableLocation),
+ FsPermission.valueOf("drwx--x--x"));
+ while((p = p.getParent()) != null) {
+ if(!fs.getFileStatus(p).getPermission().getOtherAction().implies(FsAction.EXECUTE))
+ throw new IOException("Table's parent directories must at least have global execute permissions.");
+ }
+ }
}
outputFormat = new HBaseBulkOutputFormat();
Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ImportSequenceFile.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ImportSequenceFile.java?rev=1212174&r1=1212173&r2=1212174&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ImportSequenceFile.java (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ImportSequenceFile.java Thu Dec 8 22:35:28 2011
@@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.mapreduce.PutSortReducer;
@@ -186,14 +187,15 @@ class ImportSequenceFile {
/**
* Method to run the Importer MapReduce Job. Normally will be called by another MR job
- * during OutputCommitter.commitJob(). It wil inherit
- * @param parentConf configuration of the parent job
+ * during OutputCommitter.commitJob().
+ * @param parentContext JobContext of the parent job
* @param tableName name of table to bulk load data into
* @param InputDir path of SequenceFile formatted data to read
* @param scratchDir temporary path for the Importer MR job to build the HFiles which will be imported
* @return
*/
- static boolean runJob(Configuration parentConf, String tableName, Path InputDir, Path scratchDir) {
+ static boolean runJob(JobContext parentContext, String tableName, Path InputDir, Path scratchDir) {
+ Configuration parentConf = parentContext.getConfiguration();
Configuration conf = new Configuration();
for(Map.Entry<String,String> el: parentConf) {
if(el.getKey().startsWith("hbase."))
@@ -206,6 +208,13 @@ class ImportSequenceFile {
conf.set("mapred.job.classpath.archives",parentConf.get("mapred.job.classpath.archives", ""));
conf.set("mapreduce.job.cache.archives.visibilities",parentConf.get("mapreduce.job.cache.archives.visibilities",""));
+ //Temporary fix until hbase security is ready
+ //We need the written HFile to be world readable so
+ //hbase regionserver user has the privileges to perform a hdfs move
+ if(parentConf.getBoolean("hadoop.security.authorization", false)) {
+ FsPermission.setUMask(conf, FsPermission.valueOf("----------"));
+ }
+
conf.set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, tableName);
boolean localMode = "local".equals(conf.get("mapred.job.tracker"));
@@ -218,6 +227,7 @@ class ImportSequenceFile {
throw new IOException("Importer work directory already exists: "+workDir);
Job job = createSubmittableJob(conf, tableName, InputDir, scratchDir, localMode);
job.setWorkingDirectory(workDir);
+ job.getCredentials().addAll(parentContext.getCredentials());
success = job.waitForCompletion(true);
fs.delete(workDir, true);
//We only cleanup on success because failure might've been caused by existence of target directory
Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java?rev=1212174&r1=1212173&r2=1212174&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java Thu Dec 8 22:35:28 2011
@@ -1,5 +1,7 @@
package org.apache.hcatalog.hbase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -30,6 +32,7 @@ import org.apache.hcatalog.data.HCatReco
import org.apache.hcatalog.data.schema.HCatSchema;
import org.apache.hcatalog.mapreduce.HCatOutputFormat;
import org.apache.hcatalog.mapreduce.OutputJobInfo;
+
import org.junit.Test;
import java.io.IOException;
@@ -44,6 +47,8 @@ import static org.junit.Assert.assertTru
* Including ImprtSequenceFile, HBaseOutputStorageDrivers and HBaseBulkOutputFormat
*/
public class TestHBaseBulkOutputStorageDriver extends SkeletonHBaseTest {
+ private final static Log LOG = LogFactory.getLog(TestHBaseBulkOutputStorageDriver.class);
+
private final HiveConf allConf;
private final HCatDriver hcatDriver;
@@ -65,6 +70,7 @@ public class TestHBaseBulkOutputStorageD
}
public static class MapWrite extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
+
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String vals[] = value.toString().split(",");
@@ -99,6 +105,7 @@ public class TestHBaseBulkOutputStorageD
public void hbaseBulkOutputFormatTest() throws IOException, ClassNotFoundException, InterruptedException {
String testName = "hbaseBulkOutputFormatTest";
Path methodTestDir = new Path(getTestDir(),testName);
+ LOG.info("starting: "+testName);
String tableName = newTableName(testName).toLowerCase();
byte[] tableNameBytes = Bytes.toBytes(tableName);
@@ -120,15 +127,14 @@ public class TestHBaseBulkOutputStorageD
// input/output settings
Path inputPath = new Path(methodTestDir,"mr_input");
- getFileSystem().mkdirs(inputPath);
FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile.txt"));
for(String line: data)
os.write(Bytes.toBytes(line + "\n"));
os.close();
Path interPath = new Path(methodTestDir,"inter");
-
//create job
Job job = new Job(conf, testName);
+ HBaseHCatStorageHandler.addDependencyJars(job.getConfiguration());
job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
job.setJarByClass(this.getClass());
job.setMapperClass(MapWrite.class);
@@ -174,6 +180,7 @@ public class TestHBaseBulkOutputStorageD
public void importSequenceFileTest() throws IOException, ClassNotFoundException, InterruptedException {
String testName = "importSequenceFileTest";
Path methodTestDir = new Path(getTestDir(),testName);
+ LOG.info("starting: "+testName);
String tableName = newTableName(testName).toLowerCase();
byte[] tableNameBytes = Bytes.toBytes(tableName);
@@ -205,6 +212,7 @@ public class TestHBaseBulkOutputStorageD
//create job
Job job = new Job(conf, testName);
+ HBaseHCatStorageHandler.addDependencyJars(job.getConfiguration());
job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
job.setJarByClass(this.getClass());
job.setMapperClass(MapWrite.class);
@@ -225,7 +233,9 @@ public class TestHBaseBulkOutputStorageD
assertTrue(job.waitForCompletion(true));
- assertTrue(ImportSequenceFile.runJob(job.getConfiguration(),tableName,interPath,scratchPath));
+ job = new Job(new Configuration(allConf),testName+"_importer");
+ HBaseHCatStorageHandler.addDependencyJars(job.getConfiguration());
+ assertTrue(ImportSequenceFile.runJob(job, tableName, interPath, scratchPath));
//verify
HTable table = new HTable(conf, tableName);
@@ -252,6 +262,7 @@ public class TestHBaseBulkOutputStorageD
public void hbaseBulkOutputStorageDriverTest() throws Exception {
String testName = "hbaseBulkOutputStorageDriverTest";
Path methodTestDir = new Path(getTestDir(),testName);
+ LOG.info("starting: "+testName);
String databaseName = testName.toLowerCase();
String dbDir = new Path(methodTestDir,"DB_"+testName).toString();
@@ -291,6 +302,7 @@ public class TestHBaseBulkOutputStorageD
//create job
Job job = new Job(conf,testName);
+ HBaseHCatStorageHandler.addDependencyJars(job.getConfiguration());
job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
job.setJarByClass(this.getClass());
job.setMapperClass(MapHCatWrite.class);
@@ -355,6 +367,7 @@ public class TestHBaseBulkOutputStorageD
public void hbaseBulkOutputStorageDriverTestWithRevision() throws Exception {
String testName = "hbaseBulkOutputStorageDriverTestWithRevision";
Path methodTestDir = new Path(getTestDir(),testName);
+ LOG.info("starting: "+testName);
String databaseName = testName.toLowerCase();
String dbDir = new Path(methodTestDir,"DB_"+testName).toString();
@@ -394,6 +407,7 @@ public class TestHBaseBulkOutputStorageD
//create job
Job job = new Job(conf,testName);
+ HBaseHCatStorageHandler.addDependencyJars(job.getConfiguration());
job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
job.setJarByClass(this.getClass());
job.setMapperClass(MapHCatWrite.class);
@@ -459,7 +473,8 @@ public class TestHBaseBulkOutputStorageD
String tableQuery = "CREATE TABLE " + databaseName + "." + tableName +
"(key int, english string, spanish string) STORED BY " +
"'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" +
- "TBLPROPERTIES ('hbase.columns.mapping'=':key,"+familyName+":english,"+familyName+":spanish')" ;
+ "TBLPROPERTIES (" +
+ "'hbase.columns.mapping'=':key,"+familyName+":english,"+familyName+":spanish')" ;
assertEquals(0, hcatDriver.run(dbquery).getResponseCode());
assertEquals(0, hcatDriver.run(tableQuery).getResponseCode());
@@ -520,3 +535,4 @@ public class TestHBaseBulkOutputStorageD
}
}
+