You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by ha...@apache.org on 2011/12/19 22:02:55 UTC

svn commit: r1220971 - in /incubator/hcatalog/trunk: CHANGES.txt src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java src/java/org/apache/hcatalog/mapreduce/Security.java

Author: hashutosh
Date: Mon Dec 19 22:02:54 2011
New Revision: 1220971

URL: http://svn.apache.org/viewvc?rev=1220971&view=rev
Log:
HCATALOG-10: Shouldn't assume the secure hadoop installation (julienledem via hashutosh)

Added:
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/Security.java
Modified:
    incubator/hcatalog/trunk/CHANGES.txt
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java

Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1220971&r1=1220970&r2=1220971&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Mon Dec 19 22:02:54 2011
@@ -50,6 +50,8 @@ Trunk (unreleased changes)
   HCAT-63. RPM package integration with Hadoop (khorgath via hashutosh)
 
   IMPROVEMENTS
+  HCAT-10. Shouldn't assume the secure hadoop installation (julienledem via hashutosh)
+
   HCAT-172. End-to-end test framework for HCatalog (daijyc via hashutosh)
 
   HCAT-158. Update HAR support to work with Hadoop 205 (thw via hashutosh)

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java?rev=1220971&r1=1220970&r2=1220971&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java Mon Dec 19 22:02:54 2011
@@ -24,7 +24,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -38,18 +37,12 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.thrift.DelegationTokenSelector;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.security.token.TokenSelector;
-import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
 import org.apache.hcatalog.common.ErrorType;
 import org.apache.hcatalog.common.HCatConstants;
 import org.apache.hcatalog.common.HCatException;
@@ -61,10 +54,8 @@ import org.apache.hcatalog.data.schema.H
  * and should be given as null. The value is the HCatRecord to write.*/
 public class HCatOutputFormat extends HCatBaseOutputFormat {
 
-//    static final private Log LOG = LogFactory.getLog(HCatOutputFormat.class);
+    static final private Log LOG = LogFactory.getLog(HCatOutputFormat.class);
 
-    private static Map<String, Token<? extends AbstractDelegationTokenIdentifier>> tokenMap = new HashMap<String, Token<? extends AbstractDelegationTokenIdentifier>>();
-    
     private static int maxDynamicPartitions;
     private static boolean harRequested;
 
@@ -88,12 +79,12 @@ public class HCatOutputFormat extends HC
         if (table.getPartitionKeysSize() == 0 ){
           if ((outputJobInfo.getPartitionValues() != null) && (!outputJobInfo.getPartitionValues().isEmpty())){
             // attempt made to save partition values in non-partitioned table - throw error.
-            throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES, 
+            throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES,
                 "Partition values specified for non-partitioned table");
           }
           // non-partitioned table
           outputJobInfo.setPartitionValues(new HashMap<String, String>());
-          
+
         } else {
           // partitioned table, we expect partition values
           // convert user specified map to have lower case key names
@@ -117,12 +108,12 @@ public class HCatOutputFormat extends HC
                 dynamicPartitioningKeys.add(fs.getName().toLowerCase());
               }
             }
-            
+
             if (valueMap.size() + dynamicPartitioningKeys.size() != table.getPartitionKeysSize()){
               // If this isn't equal, then bogus key values have been inserted, error out.
               throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES,"Invalid partition keys specified");
             }
-                        
+
             outputJobInfo.setDynamicPartitioningKeys(dynamicPartitioningKeys);
             String dynHash;
             if ((dynHash = conf.get(HCatConstants.HCAT_DYNAMIC_PTN_JOBID)) == null){
@@ -177,79 +168,12 @@ public class HCatOutputFormat extends HC
         FsPermission.setUMask(conf, FsPermission.getDefault().applyUMask(
             tblPath.getFileSystem(conf).getFileStatus(tblPath).getPermission()));
 
-        if(UserGroupInformation.isSecurityEnabled()){
-          UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-          // check if oozie has set up a hcat deleg. token - if so use it
-          TokenSelector<? extends TokenIdentifier> tokenSelector = new DelegationTokenSelector();
-          // TODO: will oozie use a "service" called "oozie" - then instead of
-          // new Text() do new Text("oozie") below - if this change is made also
-          // remember to do:
-          //  job.getConfiguration().set(HCAT_KEY_TOKEN_SIGNATURE, "oozie");
-          // Also change code in OutputCommitter.cleanupJob() to cancel the
-          // token only if token.service is not "oozie" - remove the condition of
-          // HCAT_KEY_TOKEN_SIGNATURE != null in that code.
-          Token<? extends TokenIdentifier> token = tokenSelector.selectToken(
-              new Text(), ugi.getTokens());
-          if(token != null) {
-
-            job.getCredentials().addToken(new Text(ugi.getUserName()),token);
-
-          } else {
-
-            // we did not get token set up by oozie, let's get them ourselves here.
-            // we essentially get a token per unique Output HCatTableInfo - this is
-            // done because through Pig, setOutput() method is called multiple times
-            // We want to only get the token once per unique output HCatTableInfo -
-            // we cannot just get one token since in multi-query case (> 1 store in 1 job)
-            // or the case when a single pig script results in > 1 jobs, the single
-            // token will get cancelled by the output committer and the subsequent
-            // stores will fail - by tying the token with the concatenation of
-            // dbname, tablename and partition keyvalues of the output
-            // TableInfo, we can have as many tokens as there are stores and the TokenSelector
-            // will correctly pick the right tokens which the committer will use and
-            // cancel.
-            
-            String tokenSignature = getTokenSignature(outputJobInfo);
-            if(tokenMap.get(tokenSignature) == null) {
-              // get delegation tokens from hcat server and store them into the "job"
-              // These will be used in to publish partitions to
-              // hcat normally in OutputCommitter.commitJob()
-              // when the JobTracker in Hadoop MapReduce starts supporting renewal of 
-              // arbitrary tokens, the renewer should be the principal of the JobTracker
-              tokenMap.put(tokenSignature, HCatUtil.extractThriftToken(
-                  client.getDelegationToken(ugi.getUserName()),
-                  tokenSignature));
-            }
-
-            String jcTokenSignature = "jc."+tokenSignature;
-            if (harRequested){
-              if(tokenMap.get(jcTokenSignature) == null) {
-                tokenMap.put(jcTokenSignature,
-                    HCatUtil.getJobTrackerDelegationToken(conf,ugi.getUserName()));
-              }
-            }
-            
-            job.getCredentials().addToken(new Text(ugi.getUserName() + tokenSignature),
-                tokenMap.get(tokenSignature));
-            // this will be used by the outputcommitter to pass on to the metastore client
-            // which in turn will pass on to the TokenSelector so that it can select
-            // the right token.
-            job.getConfiguration().set(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE, tokenSignature);
-
-            if (harRequested){
-              job.getCredentials().addToken(new Text(ugi.getUserName() + jcTokenSignature),
-                  tokenMap.get(jcTokenSignature));
-
-              job.getConfiguration().set(
-                  HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE, jcTokenSignature);
-              job.getConfiguration().set(
-                  HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_STRFORM, 
-                  tokenMap.get(jcTokenSignature).encodeToUrlString());
-              //          LOG.info("Set hive dt["+tokenSignature+"]");
-              //          LOG.info("Set jt dt["+jcTokenSignature+"]");
-            }
-          }
-       }
+        try {
+          UserGroupInformation.class.getMethod("isSecurityEnabled");
+          Security.getInstance().handleSecurity(job, outputJobInfo, client, conf, harRequested);
+        } catch (NoSuchMethodException e) {
+          LOG.info("Security is not supported by this version of hadoop.");
+        }
       } catch(Exception e) {
         if( e instanceof HCatException ) {
           throw (HCatException) e;
@@ -264,27 +188,6 @@ public class HCatOutputFormat extends HC
       }
     }
 
-    // a signature string to associate with a HCatTableInfo - essentially
-    // a concatenation of dbname, tablename and partition keyvalues.
-    private static String getTokenSignature(OutputJobInfo outputJobInfo) {
-      StringBuilder result = new StringBuilder("");
-      String dbName = outputJobInfo.getDatabaseName();
-      if(dbName != null) {
-        result.append(dbName);
-      }
-      String tableName = outputJobInfo.getTableName();
-      if(tableName != null) {
-        result.append("+" + tableName);
-      }
-      Map<String, String> partValues = outputJobInfo.getPartitionValues();
-      if(partValues != null) {
-        for(Entry<String, String> entry: partValues.entrySet()) {
-          result.append("+" + entry.getKey() + "=" + entry.getValue());
-        }
-      }
-      return result.toString();
-    }
-
     /**
      * Set the schema for the data being written out to the partition. The
      * table schema is used by default for the partition if this is not called.
@@ -331,7 +234,13 @@ public class HCatOutputFormat extends HC
     static HiveMetaStoreClient createHiveClient(String url, Configuration conf) throws IOException, MetaException {
       HiveConf hiveConf = getHiveConf(url, conf);
 //      HCatUtil.logHiveConf(LOG, hiveConf);
-      return new HiveMetaStoreClient(hiveConf);
+      try {
+        return new HiveMetaStoreClient(hiveConf);
+      } catch (MetaException e) {
+        LOG.error("Error connecting to the metastore (conf follows): "+e.getMessage(), e);
+        HCatUtil.logHiveConf(LOG, hiveConf);
+        throw e;
+      }
     }
 
 
@@ -343,7 +252,7 @@ public class HCatOutputFormat extends HC
 
         hiveConf.set("hive.metastore.local", "false");
         hiveConf.set(ConfVars.METASTOREURIS.varname, url);
-        
+
         String kerberosPrincipal = conf.get(HCatConstants.HCAT_METASTORE_PRINCIPAL);
         if (kerberosPrincipal == null){
             kerberosPrincipal = conf.get(ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname);
@@ -351,7 +260,7 @@ public class HCatOutputFormat extends HC
         if (kerberosPrincipal != null){
             hiveConf.setBoolean(ConfVars.METASTORE_USE_THRIFT_SASL.varname, true);
             hiveConf.set(ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname, kerberosPrincipal);
-        }        
+        }
         if(conf.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
           hiveConf.set("hive.metastore.token.signature", conf.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE));
         }
@@ -381,12 +290,12 @@ public class HCatOutputFormat extends HC
         }
 
       }
-      
+
       // figure out what the maximum number of partitions allowed is, so we can pass it on to our outputinfo
       if (HCatConstants.HCAT_IS_DYNAMIC_MAX_PTN_CHECK_ENABLED){
         maxDynamicPartitions = hiveConf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS);
       }else{
-        maxDynamicPartitions = -1; // disables bounds checking for maximum number of dynamic partitions 
+        maxDynamicPartitions = -1; // disables bounds checking for maximum number of dynamic partitions
       }
       harRequested = hiveConf.getBoolVar(HiveConf.ConfVars.HIVEARCHIVEENABLED);
       return hiveConf;

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/Security.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/Security.java?rev=1220971&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/Security.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/Security.java Mon Dec 19 22:02:54 2011
@@ -0,0 +1,138 @@
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.thrift.DelegationTokenSelector;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.thrift.TException;
+
+final class Security {
+  
+  // making sure this is not initialized unless needed
+  private static final class LazyHolder {
+    public static final Security INSTANCE = new Security();
+  }
+
+  private static Map<String, Token<? extends AbstractDelegationTokenIdentifier>> tokenMap = new HashMap<String, Token<? extends AbstractDelegationTokenIdentifier>>();
+  
+  public static Security getInstance() {
+    return LazyHolder.INSTANCE;
+  }
+
+  // a signature string to associate with a HCatTableInfo - essentially
+  // a concatenation of dbname, tablename and partition keyvalues.
+  private String getTokenSignature(OutputJobInfo outputJobInfo) {
+    StringBuilder result = new StringBuilder("");
+    String dbName = outputJobInfo.getDatabaseName();
+    if(dbName != null) {
+      result.append(dbName);
+    }
+    String tableName = outputJobInfo.getTableName();
+    if(tableName != null) {
+      result.append("+" + tableName);
+    }
+    Map<String, String> partValues = outputJobInfo.getPartitionValues();
+    if(partValues != null) {
+      for(Entry<String, String> entry: partValues.entrySet()) {
+        result.append("+" + entry.getKey() + "=" + entry.getValue());
+      }
+    }
+    return result.toString();
+  }
+
+  void handleSecurity(
+      Job job, 
+      OutputJobInfo outputJobInfo,
+      HiveMetaStoreClient client, 
+      Configuration conf,
+      boolean harRequested)
+      throws IOException, MetaException, TException, Exception {
+    if(UserGroupInformation.isSecurityEnabled()){
+      UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+      // check if oozie has set up a hcat deleg. token - if so use it
+      TokenSelector<? extends TokenIdentifier> tokenSelector = new DelegationTokenSelector();
+      // TODO: will oozie use a "service" called "oozie" - then instead of
+      // new Text() do new Text("oozie") below - if this change is made also
+      // remember to do:
+      //  job.getConfiguration().set(HCAT_KEY_TOKEN_SIGNATURE, "oozie");
+      // Also change code in OutputCommitter.cleanupJob() to cancel the
+      // token only if token.service is not "oozie" - remove the condition of
+      // HCAT_KEY_TOKEN_SIGNATURE != null in that code.
+      Token<? extends TokenIdentifier> token = tokenSelector.selectToken(
+          new Text(), ugi.getTokens());
+      if(token != null) {
+  
+        job.getCredentials().addToken(new Text(ugi.getUserName()),token);
+  
+      } else {
+  
+        // we did not get token set up by oozie, let's get them ourselves here.
+        // we essentially get a token per unique Output HCatTableInfo - this is
+        // done because through Pig, setOutput() method is called multiple times
+        // We want to only get the token once per unique output HCatTableInfo -
+        // we cannot just get one token since in multi-query case (> 1 store in 1 job)
+        // or the case when a single pig script results in > 1 jobs, the single
+        // token will get cancelled by the output committer and the subsequent
+        // stores will fail - by tying the token with the concatenation of
+        // dbname, tablename and partition keyvalues of the output
+        // TableInfo, we can have as many tokens as there are stores and the TokenSelector
+        // will correctly pick the right tokens which the committer will use and
+        // cancel.
+        
+        String tokenSignature = getTokenSignature(outputJobInfo);
+        if(tokenMap.get(tokenSignature) == null) {
+          // get delegation tokens from hcat server and store them into the "job"
+          // These will be used in to publish partitions to
+          // hcat normally in OutputCommitter.commitJob()
+          // when the JobTracker in Hadoop MapReduce starts supporting renewal of 
+          // arbitrary tokens, the renewer should be the principal of the JobTracker
+          tokenMap.put(tokenSignature, HCatUtil.extractThriftToken(
+              client.getDelegationToken(ugi.getUserName()),
+              tokenSignature));
+        }
+  
+        String jcTokenSignature = "jc."+tokenSignature;
+        if (harRequested){
+          if(tokenMap.get(jcTokenSignature) == null) {
+            tokenMap.put(jcTokenSignature,
+                HCatUtil.getJobTrackerDelegationToken(conf,ugi.getUserName()));
+          }
+        }
+        
+        job.getCredentials().addToken(new Text(ugi.getUserName() + tokenSignature),
+            tokenMap.get(tokenSignature));
+        // this will be used by the outputcommitter to pass on to the metastore client
+        // which in turn will pass on to the TokenSelector so that it can select
+        // the right token.
+        job.getConfiguration().set(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE, tokenSignature);
+  
+        if (harRequested){
+          job.getCredentials().addToken(new Text(ugi.getUserName() + jcTokenSignature),
+              tokenMap.get(jcTokenSignature));
+  
+          job.getConfiguration().set(
+              HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE, jcTokenSignature);
+          job.getConfiguration().set(
+              HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_STRFORM, 
+              tokenMap.get(jcTokenSignature).encodeToUrlString());
+          //          LOG.info("Set hive dt["+tokenSignature+"]");
+          //          LOG.info("Set jt dt["+jcTokenSignature+"]");
+        }
+      }
+  }
+  }
+}
\ No newline at end of file