You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by kh...@apache.org on 2011/12/08 22:35:29 UTC

svn commit: r1212174 - in /incubator/hcatalog/trunk: ./ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/

Author: khorgath
Date: Thu Dec  8 22:35:28 2011
New Revision: 1212174

URL: http://svn.apache.org/viewvc?rev=1212174&view=rev
Log:
HCATALOG-171 HBaseBulkOutputStorageDriver should work with secured hadoop (toffer via khorgath)

Modified:
    incubator/hcatalog/trunk/CHANGES.txt
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ImportSequenceFile.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java

Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1212174&r1=1212173&r2=1212174&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Thu Dec  8 22:35:28 2011
@@ -81,6 +81,8 @@ Trunk (unreleased changes)
   OPTIMIZATIONS
 
   BUG FIXES
+  HCAT-171. HBaseBulkOutputStorageDriver should work with secured hadoop (toffer via khorgath)
+
   HCAT-170. HBaseBulkOSD fails to launch ImportSequenceFile because of missing jars in dist cache (toffer via khorgath)
 
   HCAT-176. Class not found exception when running TestPigStorageDriver (daijy via khorgath)

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java?rev=1212174&r1=1212173&r2=1212174&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java Thu Dec  8 22:35:28 2011
@@ -38,7 +38,6 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.UUID;
 
 /**
  * Base class share by both {@link HBaseBulkOutputStorageDriver} and {@link HBaseDirectOutputStorageDriver}

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java?rev=1212174&r1=1212173&r2=1212174&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java Thu Dec  8 22:35:28 2011
@@ -5,7 +5,10 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.OutputCommitter;
@@ -33,6 +36,12 @@ class HBaseBulkOutputFormat extends Outp
     @Override
     public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
         baseOutputFormat.checkOutputSpecs(context);
+        //Get jobTracker delegation token if security is enabled
+        //we need to launch the ImportSequenceFile job
+        if(context.getConfiguration().getBoolean("hadoop.security.authorization",false)) {
+            JobClient jobClient = new JobClient(new JobConf(context.getConfiguration()));
+            context.getCredentials().addToken(new Text("my mr token"), jobClient.getDelegationToken(null));
+        }
     }
 
     @Override
@@ -124,7 +133,7 @@ class HBaseBulkOutputFormat extends Outp
                 Configuration conf = jobContext.getConfiguration();
                 Path srcPath = FileOutputFormat.getOutputPath(jobContext);
                 Path destPath = new Path(srcPath.getParent(),srcPath.getName()+"_hfiles");
-                ImportSequenceFile.runJob(conf,
+                ImportSequenceFile.runJob(jobContext,
                                                         conf.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY),
                                                         srcPath,
                                                         destPath);

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java?rev=1212174&r1=1212173&r2=1212174&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java Thu Dec  8 22:35:28 2011
@@ -18,7 +18,10 @@
 
 package org.apache.hcatalog.hbase;
 
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -52,13 +55,27 @@ public class HBaseBulkOutputStorageDrive
         //initialize() gets called multiple time in the lifecycle of an MR job, client, mapper, reducer, etc
         //depending on the case we have to make sure for some context variables we set here that they don't get set again
         if(!outputJobInfo.getProperties().containsKey(PROPERTY_INT_OUTPUT_LOCATION)) {
-            String location = new  Path(context.getConfiguration().get(PROPERTY_TABLE_LOCATION),
+            String tableLocation = context.getConfiguration().get(PROPERTY_TABLE_LOCATION);
+            String location = new  Path(tableLocation,
                                                     "REVISION_"+outputJobInfo.getProperties()
                                                                                                .getProperty(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY)).toString();
             outputJobInfo.getProperties().setProperty(PROPERTY_INT_OUTPUT_LOCATION, location);
             //We are writing out an intermediate sequenceFile hence location is not passed in OutputJobInfo.getLocation()
             //TODO replace this with a mapreduce constant when available
             context.getConfiguration().set("mapred.output.dir", location);
+            //Temporary fix until support for secure hbase is available
+            //We need the intermediate directory to be world readable
+            //so that the hbase user can import the generated hfiles
+            if(context.getConfiguration().getBoolean("hadoop.security.authorization",false)) {
+                Path p = new Path(tableLocation);
+                FileSystem fs = FileSystem.get(context.getConfiguration());
+                fs.setPermission(new Path(tableLocation),
+                                        FsPermission.valueOf("drwx--x--x"));
+                while((p = p.getParent()) != null) {
+                    if(!fs.getFileStatus(p).getPermission().getOtherAction().implies(FsAction.EXECUTE))
+                        throw new IOException("Table's parent directories must at least have global execute permissions.");
+                }
+            }
         }
 
         outputFormat = new HBaseBulkOutputFormat();

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ImportSequenceFile.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ImportSequenceFile.java?rev=1212174&r1=1212173&r2=1212174&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ImportSequenceFile.java (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ImportSequenceFile.java Thu Dec  8 22:35:28 2011
@@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.mapreduce.PutSortReducer;
@@ -186,14 +187,15 @@ class ImportSequenceFile {
 
     /**
      * Method to run the Importer MapReduce Job. Normally will be called by another MR job
-     * during OutputCommitter.commitJob(). It wil inherit
-      * @param parentConf configuration of the parent job
+     * during OutputCommitter.commitJob().
+      * @param parentContext JobContext of the parent job
      * @param tableName name of table to bulk load data into
      * @param InputDir path of SequenceFile formatted data to read
      * @param scratchDir temporary path for the Importer MR job to build the HFiles which will be imported
      * @return
      */
-    static boolean runJob(Configuration parentConf, String tableName, Path InputDir, Path scratchDir) {
+    static boolean runJob(JobContext parentContext, String tableName, Path InputDir, Path scratchDir) {
+        Configuration parentConf = parentContext.getConfiguration();
         Configuration conf = new Configuration();
         for(Map.Entry<String,String> el: parentConf) {
             if(el.getKey().startsWith("hbase."))
@@ -206,6 +208,13 @@ class ImportSequenceFile {
         conf.set("mapred.job.classpath.archives",parentConf.get("mapred.job.classpath.archives", ""));
         conf.set("mapreduce.job.cache.archives.visibilities",parentConf.get("mapreduce.job.cache.archives.visibilities",""));
 
+        //Temporary fix until hbase security is ready
+        //We need the written HFile to be world readable so
+        //hbase regionserver user has the privileges to perform a hdfs move
+        if(parentConf.getBoolean("hadoop.security.authorization", false)) {
+            FsPermission.setUMask(conf, FsPermission.valueOf("----------"));
+        }
+
         conf.set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, tableName);
 
         boolean localMode = "local".equals(conf.get("mapred.job.tracker"));
@@ -218,6 +227,7 @@ class ImportSequenceFile {
                 throw new IOException("Importer work directory already exists: "+workDir);
             Job job = createSubmittableJob(conf, tableName, InputDir, scratchDir, localMode);
             job.setWorkingDirectory(workDir);
+            job.getCredentials().addAll(parentContext.getCredentials());
             success = job.waitForCompletion(true);
             fs.delete(workDir, true);
             //We only cleanup on success because failure might've been caused by existence of target directory

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java?rev=1212174&r1=1212173&r2=1212174&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java Thu Dec  8 22:35:28 2011
@@ -1,5 +1,7 @@
 package org.apache.hcatalog.hbase;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -30,6 +32,7 @@ import org.apache.hcatalog.data.HCatReco
 import org.apache.hcatalog.data.schema.HCatSchema;
 import org.apache.hcatalog.mapreduce.HCatOutputFormat;
 import org.apache.hcatalog.mapreduce.OutputJobInfo;
+
 import org.junit.Test;
 
 import java.io.IOException;
@@ -44,6 +47,8 @@ import static org.junit.Assert.assertTru
  * Including ImprtSequenceFile, HBaseOutputStorageDrivers and HBaseBulkOutputFormat
  */
 public class TestHBaseBulkOutputStorageDriver extends SkeletonHBaseTest {
+    private final static Log LOG = LogFactory.getLog(TestHBaseBulkOutputStorageDriver.class);
+
     private final HiveConf allConf;
     private final HCatDriver hcatDriver;
 
@@ -65,6 +70,7 @@ public class TestHBaseBulkOutputStorageD
     }
 
     public static class MapWrite extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
+
         @Override
         public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
             String vals[] = value.toString().split(",");
@@ -99,6 +105,7 @@ public class TestHBaseBulkOutputStorageD
     public void hbaseBulkOutputFormatTest() throws IOException, ClassNotFoundException, InterruptedException {
         String testName = "hbaseBulkOutputFormatTest";
         Path methodTestDir = new Path(getTestDir(),testName);
+        LOG.info("starting: "+testName);
 
         String tableName = newTableName(testName).toLowerCase();
         byte[] tableNameBytes = Bytes.toBytes(tableName);
@@ -120,15 +127,14 @@ public class TestHBaseBulkOutputStorageD
 
         // input/output settings
         Path inputPath = new Path(methodTestDir,"mr_input");
-        getFileSystem().mkdirs(inputPath);
         FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile.txt"));
         for(String line: data)
             os.write(Bytes.toBytes(line + "\n"));
         os.close();
         Path interPath = new Path(methodTestDir,"inter");
-
         //create job
         Job job = new Job(conf, testName);
+        HBaseHCatStorageHandler.addDependencyJars(job.getConfiguration());
         job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
         job.setJarByClass(this.getClass());
         job.setMapperClass(MapWrite.class);
@@ -174,6 +180,7 @@ public class TestHBaseBulkOutputStorageD
     public void importSequenceFileTest() throws IOException, ClassNotFoundException, InterruptedException {
         String testName = "importSequenceFileTest";
         Path methodTestDir = new Path(getTestDir(),testName);
+        LOG.info("starting: "+testName);
 
         String tableName = newTableName(testName).toLowerCase();
         byte[] tableNameBytes = Bytes.toBytes(tableName);
@@ -205,6 +212,7 @@ public class TestHBaseBulkOutputStorageD
 
         //create job
         Job job = new Job(conf, testName);
+        HBaseHCatStorageHandler.addDependencyJars(job.getConfiguration());
         job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
         job.setJarByClass(this.getClass());
         job.setMapperClass(MapWrite.class);
@@ -225,7 +233,9 @@ public class TestHBaseBulkOutputStorageD
 
         assertTrue(job.waitForCompletion(true));
 
-        assertTrue(ImportSequenceFile.runJob(job.getConfiguration(),tableName,interPath,scratchPath));
+        job = new Job(new Configuration(allConf),testName+"_importer");
+        HBaseHCatStorageHandler.addDependencyJars(job.getConfiguration());
+        assertTrue(ImportSequenceFile.runJob(job, tableName, interPath, scratchPath));
 
         //verify
         HTable table = new HTable(conf, tableName);
@@ -252,6 +262,7 @@ public class TestHBaseBulkOutputStorageD
     public void hbaseBulkOutputStorageDriverTest() throws Exception {
         String testName = "hbaseBulkOutputStorageDriverTest";
         Path methodTestDir = new Path(getTestDir(),testName);
+        LOG.info("starting: "+testName);
 
         String databaseName = testName.toLowerCase();
         String dbDir = new Path(methodTestDir,"DB_"+testName).toString();
@@ -291,6 +302,7 @@ public class TestHBaseBulkOutputStorageD
 
         //create job
         Job job = new Job(conf,testName);
+        HBaseHCatStorageHandler.addDependencyJars(job.getConfiguration());
         job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
         job.setJarByClass(this.getClass());
         job.setMapperClass(MapHCatWrite.class);
@@ -355,6 +367,7 @@ public class TestHBaseBulkOutputStorageD
     public void hbaseBulkOutputStorageDriverTestWithRevision() throws Exception {
         String testName = "hbaseBulkOutputStorageDriverTestWithRevision";
         Path methodTestDir = new Path(getTestDir(),testName);
+        LOG.info("starting: "+testName);
 
         String databaseName = testName.toLowerCase();
         String dbDir = new Path(methodTestDir,"DB_"+testName).toString();
@@ -394,6 +407,7 @@ public class TestHBaseBulkOutputStorageD
 
         //create job
         Job job = new Job(conf,testName);
+        HBaseHCatStorageHandler.addDependencyJars(job.getConfiguration());
         job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
         job.setJarByClass(this.getClass());
         job.setMapperClass(MapHCatWrite.class);
@@ -459,7 +473,8 @@ public class TestHBaseBulkOutputStorageD
         String tableQuery = "CREATE TABLE " + databaseName + "." + tableName +
                               "(key int, english string, spanish string) STORED BY " +
                               "'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" +
-                              "TBLPROPERTIES ('hbase.columns.mapping'=':key,"+familyName+":english,"+familyName+":spanish')" ;
+                              "TBLPROPERTIES (" +
+                              "'hbase.columns.mapping'=':key,"+familyName+":english,"+familyName+":spanish')" ;
 
         assertEquals(0, hcatDriver.run(dbquery).getResponseCode());
         assertEquals(0, hcatDriver.run(tableQuery).getResponseCode());
@@ -520,3 +535,4 @@ public class TestHBaseBulkOutputStorageD
     }
 
 }
+