You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by ha...@apache.org on 2011/12/19 22:02:55 UTC
svn commit: r1220971 - in /incubator/hcatalog/trunk: CHANGES.txt
src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java
src/java/org/apache/hcatalog/mapreduce/Security.java
Author: hashutosh
Date: Mon Dec 19 22:02:54 2011
New Revision: 1220971
URL: http://svn.apache.org/viewvc?rev=1220971&view=rev
Log:
HCATALOG-10: Shouldn't assume the secure hadoop installation (julienledem via hashutosh)
Added:
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/Security.java
Modified:
incubator/hcatalog/trunk/CHANGES.txt
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java
Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1220971&r1=1220970&r2=1220971&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Mon Dec 19 22:02:54 2011
@@ -50,6 +50,8 @@ Trunk (unreleased changes)
HCAT-63. RPM package integration with Hadoop (khorgath via hashutosh)
IMPROVEMENTS
+ HCAT-10. Shouldn't assume the secure hadoop installation (julienledem via hashutosh)
+
HCAT-172. End-to-end test framework for HCatalog (daijyc via hashutosh)
HCAT-158. Update HAR support to work with Hadoop 205 (thw via hashutosh)
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java?rev=1220971&r1=1220970&r2=1220971&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java Mon Dec 19 22:02:54 2011
@@ -24,7 +24,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -38,18 +37,12 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.thrift.DelegationTokenSelector;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.security.token.TokenSelector;
-import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.apache.hcatalog.common.ErrorType;
import org.apache.hcatalog.common.HCatConstants;
import org.apache.hcatalog.common.HCatException;
@@ -61,10 +54,8 @@ import org.apache.hcatalog.data.schema.H
* and should be given as null. The value is the HCatRecord to write.*/
public class HCatOutputFormat extends HCatBaseOutputFormat {
-// static final private Log LOG = LogFactory.getLog(HCatOutputFormat.class);
+ static final private Log LOG = LogFactory.getLog(HCatOutputFormat.class);
- private static Map<String, Token<? extends AbstractDelegationTokenIdentifier>> tokenMap = new HashMap<String, Token<? extends AbstractDelegationTokenIdentifier>>();
-
private static int maxDynamicPartitions;
private static boolean harRequested;
@@ -88,12 +79,12 @@ public class HCatOutputFormat extends HC
if (table.getPartitionKeysSize() == 0 ){
if ((outputJobInfo.getPartitionValues() != null) && (!outputJobInfo.getPartitionValues().isEmpty())){
// attempt made to save partition values in non-partitioned table - throw error.
- throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES,
+ throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES,
"Partition values specified for non-partitioned table");
}
// non-partitioned table
outputJobInfo.setPartitionValues(new HashMap<String, String>());
-
+
} else {
// partitioned table, we expect partition values
// convert user specified map to have lower case key names
@@ -117,12 +108,12 @@ public class HCatOutputFormat extends HC
dynamicPartitioningKeys.add(fs.getName().toLowerCase());
}
}
-
+
if (valueMap.size() + dynamicPartitioningKeys.size() != table.getPartitionKeysSize()){
// If this isn't equal, then bogus key values have been inserted, error out.
throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES,"Invalid partition keys specified");
}
-
+
outputJobInfo.setDynamicPartitioningKeys(dynamicPartitioningKeys);
String dynHash;
if ((dynHash = conf.get(HCatConstants.HCAT_DYNAMIC_PTN_JOBID)) == null){
@@ -177,79 +168,12 @@ public class HCatOutputFormat extends HC
FsPermission.setUMask(conf, FsPermission.getDefault().applyUMask(
tblPath.getFileSystem(conf).getFileStatus(tblPath).getPermission()));
- if(UserGroupInformation.isSecurityEnabled()){
- UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
- // check if oozie has set up a hcat deleg. token - if so use it
- TokenSelector<? extends TokenIdentifier> tokenSelector = new DelegationTokenSelector();
- // TODO: will oozie use a "service" called "oozie" - then instead of
- // new Text() do new Text("oozie") below - if this change is made also
- // remember to do:
- // job.getConfiguration().set(HCAT_KEY_TOKEN_SIGNATURE, "oozie");
- // Also change code in OutputCommitter.cleanupJob() to cancel the
- // token only if token.service is not "oozie" - remove the condition of
- // HCAT_KEY_TOKEN_SIGNATURE != null in that code.
- Token<? extends TokenIdentifier> token = tokenSelector.selectToken(
- new Text(), ugi.getTokens());
- if(token != null) {
-
- job.getCredentials().addToken(new Text(ugi.getUserName()),token);
-
- } else {
-
- // we did not get token set up by oozie, let's get them ourselves here.
- // we essentially get a token per unique Output HCatTableInfo - this is
- // done because through Pig, setOutput() method is called multiple times
- // We want to only get the token once per unique output HCatTableInfo -
- // we cannot just get one token since in multi-query case (> 1 store in 1 job)
- // or the case when a single pig script results in > 1 jobs, the single
- // token will get cancelled by the output committer and the subsequent
- // stores will fail - by tying the token with the concatenation of
- // dbname, tablename and partition keyvalues of the output
- // TableInfo, we can have as many tokens as there are stores and the TokenSelector
- // will correctly pick the right tokens which the committer will use and
- // cancel.
-
- String tokenSignature = getTokenSignature(outputJobInfo);
- if(tokenMap.get(tokenSignature) == null) {
- // get delegation tokens from hcat server and store them into the "job"
- // These will be used in to publish partitions to
- // hcat normally in OutputCommitter.commitJob()
- // when the JobTracker in Hadoop MapReduce starts supporting renewal of
- // arbitrary tokens, the renewer should be the principal of the JobTracker
- tokenMap.put(tokenSignature, HCatUtil.extractThriftToken(
- client.getDelegationToken(ugi.getUserName()),
- tokenSignature));
- }
-
- String jcTokenSignature = "jc."+tokenSignature;
- if (harRequested){
- if(tokenMap.get(jcTokenSignature) == null) {
- tokenMap.put(jcTokenSignature,
- HCatUtil.getJobTrackerDelegationToken(conf,ugi.getUserName()));
- }
- }
-
- job.getCredentials().addToken(new Text(ugi.getUserName() + tokenSignature),
- tokenMap.get(tokenSignature));
- // this will be used by the outputcommitter to pass on to the metastore client
- // which in turn will pass on to the TokenSelector so that it can select
- // the right token.
- job.getConfiguration().set(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE, tokenSignature);
-
- if (harRequested){
- job.getCredentials().addToken(new Text(ugi.getUserName() + jcTokenSignature),
- tokenMap.get(jcTokenSignature));
-
- job.getConfiguration().set(
- HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE, jcTokenSignature);
- job.getConfiguration().set(
- HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_STRFORM,
- tokenMap.get(jcTokenSignature).encodeToUrlString());
- // LOG.info("Set hive dt["+tokenSignature+"]");
- // LOG.info("Set jt dt["+jcTokenSignature+"]");
- }
- }
- }
+ try {
+ UserGroupInformation.class.getMethod("isSecurityEnabled");
+ Security.getInstance().handleSecurity(job, outputJobInfo, client, conf, harRequested);
+ } catch (NoSuchMethodException e) {
+ LOG.info("Security is not supported by this version of hadoop.");
+ }
} catch(Exception e) {
if( e instanceof HCatException ) {
throw (HCatException) e;
@@ -264,27 +188,6 @@ public class HCatOutputFormat extends HC
}
}
- // a signature string to associate with a HCatTableInfo - essentially
- // a concatenation of dbname, tablename and partition keyvalues.
- private static String getTokenSignature(OutputJobInfo outputJobInfo) {
- StringBuilder result = new StringBuilder("");
- String dbName = outputJobInfo.getDatabaseName();
- if(dbName != null) {
- result.append(dbName);
- }
- String tableName = outputJobInfo.getTableName();
- if(tableName != null) {
- result.append("+" + tableName);
- }
- Map<String, String> partValues = outputJobInfo.getPartitionValues();
- if(partValues != null) {
- for(Entry<String, String> entry: partValues.entrySet()) {
- result.append("+" + entry.getKey() + "=" + entry.getValue());
- }
- }
- return result.toString();
- }
-
/**
* Set the schema for the data being written out to the partition. The
* table schema is used by default for the partition if this is not called.
@@ -331,7 +234,13 @@ public class HCatOutputFormat extends HC
static HiveMetaStoreClient createHiveClient(String url, Configuration conf) throws IOException, MetaException {
HiveConf hiveConf = getHiveConf(url, conf);
// HCatUtil.logHiveConf(LOG, hiveConf);
- return new HiveMetaStoreClient(hiveConf);
+ try {
+ return new HiveMetaStoreClient(hiveConf);
+ } catch (MetaException e) {
+ LOG.error("Error connecting to the metastore (conf follows): "+e.getMessage(), e);
+ HCatUtil.logHiveConf(LOG, hiveConf);
+ throw e;
+ }
}
@@ -343,7 +252,7 @@ public class HCatOutputFormat extends HC
hiveConf.set("hive.metastore.local", "false");
hiveConf.set(ConfVars.METASTOREURIS.varname, url);
-
+
String kerberosPrincipal = conf.get(HCatConstants.HCAT_METASTORE_PRINCIPAL);
if (kerberosPrincipal == null){
kerberosPrincipal = conf.get(ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname);
@@ -351,7 +260,7 @@ public class HCatOutputFormat extends HC
if (kerberosPrincipal != null){
hiveConf.setBoolean(ConfVars.METASTORE_USE_THRIFT_SASL.varname, true);
hiveConf.set(ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname, kerberosPrincipal);
- }
+ }
if(conf.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
hiveConf.set("hive.metastore.token.signature", conf.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE));
}
@@ -381,12 +290,12 @@ public class HCatOutputFormat extends HC
}
}
-
+
// figure out what the maximum number of partitions allowed is, so we can pass it on to our outputinfo
if (HCatConstants.HCAT_IS_DYNAMIC_MAX_PTN_CHECK_ENABLED){
maxDynamicPartitions = hiveConf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS);
}else{
- maxDynamicPartitions = -1; // disables bounds checking for maximum number of dynamic partitions
+ maxDynamicPartitions = -1; // disables bounds checking for maximum number of dynamic partitions
}
harRequested = hiveConf.getBoolVar(HiveConf.ConfVars.HIVEARCHIVEENABLED);
return hiveConf;
Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/Security.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/Security.java?rev=1220971&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/Security.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/Security.java Mon Dec 19 22:02:54 2011
@@ -0,0 +1,138 @@
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.thrift.DelegationTokenSelector;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.thrift.TException;
+
+final class Security {
+
+ // making sure this is not initialized unless needed
+ private static final class LazyHolder {
+ public static final Security INSTANCE = new Security();
+ }
+
+ private static Map<String, Token<? extends AbstractDelegationTokenIdentifier>> tokenMap = new HashMap<String, Token<? extends AbstractDelegationTokenIdentifier>>();
+
+ public static Security getInstance() {
+ return LazyHolder.INSTANCE;
+ }
+
+ // a signature string to associate with a HCatTableInfo - essentially
+ // a concatenation of dbname, tablename and partition keyvalues.
+ private String getTokenSignature(OutputJobInfo outputJobInfo) {
+ StringBuilder result = new StringBuilder("");
+ String dbName = outputJobInfo.getDatabaseName();
+ if(dbName != null) {
+ result.append(dbName);
+ }
+ String tableName = outputJobInfo.getTableName();
+ if(tableName != null) {
+ result.append("+" + tableName);
+ }
+ Map<String, String> partValues = outputJobInfo.getPartitionValues();
+ if(partValues != null) {
+ for(Entry<String, String> entry: partValues.entrySet()) {
+ result.append("+" + entry.getKey() + "=" + entry.getValue());
+ }
+ }
+ return result.toString();
+ }
+
+ void handleSecurity(
+ Job job,
+ OutputJobInfo outputJobInfo,
+ HiveMetaStoreClient client,
+ Configuration conf,
+ boolean harRequested)
+ throws IOException, MetaException, TException, Exception {
+ if(UserGroupInformation.isSecurityEnabled()){
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ // check if oozie has set up a hcat deleg. token - if so use it
+ TokenSelector<? extends TokenIdentifier> tokenSelector = new DelegationTokenSelector();
+ // TODO: will oozie use a "service" called "oozie" - then instead of
+ // new Text() do new Text("oozie") below - if this change is made also
+ // remember to do:
+ // job.getConfiguration().set(HCAT_KEY_TOKEN_SIGNATURE, "oozie");
+ // Also change code in OutputCommitter.cleanupJob() to cancel the
+ // token only if token.service is not "oozie" - remove the condition of
+ // HCAT_KEY_TOKEN_SIGNATURE != null in that code.
+ Token<? extends TokenIdentifier> token = tokenSelector.selectToken(
+ new Text(), ugi.getTokens());
+ if(token != null) {
+
+ job.getCredentials().addToken(new Text(ugi.getUserName()),token);
+
+ } else {
+
+ // we did not get token set up by oozie, let's get them ourselves here.
+ // we essentially get a token per unique Output HCatTableInfo - this is
+ // done because through Pig, setOutput() method is called multiple times
+ // We want to only get the token once per unique output HCatTableInfo -
+ // we cannot just get one token since in multi-query case (> 1 store in 1 job)
+ // or the case when a single pig script results in > 1 jobs, the single
+ // token will get cancelled by the output committer and the subsequent
+ // stores will fail - by tying the token with the concatenation of
+ // dbname, tablename and partition keyvalues of the output
+ // TableInfo, we can have as many tokens as there are stores and the TokenSelector
+ // will correctly pick the right tokens which the committer will use and
+ // cancel.
+
+ String tokenSignature = getTokenSignature(outputJobInfo);
+ if(tokenMap.get(tokenSignature) == null) {
+ // get delegation tokens from hcat server and store them into the "job"
+ // These will be used in to publish partitions to
+ // hcat normally in OutputCommitter.commitJob()
+ // when the JobTracker in Hadoop MapReduce starts supporting renewal of
+ // arbitrary tokens, the renewer should be the principal of the JobTracker
+ tokenMap.put(tokenSignature, HCatUtil.extractThriftToken(
+ client.getDelegationToken(ugi.getUserName()),
+ tokenSignature));
+ }
+
+ String jcTokenSignature = "jc."+tokenSignature;
+ if (harRequested){
+ if(tokenMap.get(jcTokenSignature) == null) {
+ tokenMap.put(jcTokenSignature,
+ HCatUtil.getJobTrackerDelegationToken(conf,ugi.getUserName()));
+ }
+ }
+
+ job.getCredentials().addToken(new Text(ugi.getUserName() + tokenSignature),
+ tokenMap.get(tokenSignature));
+ // this will be used by the outputcommitter to pass on to the metastore client
+ // which in turn will pass on to the TokenSelector so that it can select
+ // the right token.
+ job.getConfiguration().set(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE, tokenSignature);
+
+ if (harRequested){
+ job.getCredentials().addToken(new Text(ugi.getUserName() + jcTokenSignature),
+ tokenMap.get(jcTokenSignature));
+
+ job.getConfiguration().set(
+ HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE, jcTokenSignature);
+ job.getConfiguration().set(
+ HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_STRFORM,
+ tokenMap.get(jcTokenSignature).encodeToUrlString());
+ // LOG.info("Set hive dt["+tokenSignature+"]");
+ // LOG.info("Set jt dt["+jcTokenSignature+"]");
+ }
+ }
+ }
+ }
+}
\ No newline at end of file