You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2013/11/09 02:44:14 UTC
svn commit: r1540241 - in
/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase:
mapred/TableMapReduceUtil.java mapreduce/LoadIncrementalHFiles.java
mapreduce/TableMapReduceUtil.java security/User.java
Author: mbertozzi
Date: Sat Nov 9 01:44:13 2013
New Revision: 1540241
URL: http://svn.apache.org/r1540241
Log:
HBASE-9890 MR jobs are not working if started by a delegated user
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/security/User.java
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java?rev=1540241&r1=1540240&r2=1540241&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java Sat Nov 9 01:44:13 2013
@@ -21,12 +21,15 @@ package org.apache.hadoop.hbase.mapred;
import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.UserProvider;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.zookeeper.ClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
@@ -35,6 +38,8 @@ import org.apache.hadoop.mapred.InputFor
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.security.token.Token;
+import org.apache.zookeeper.KeeperException;
/**
* Utility for {@link TableMap} and {@link TableReduce}
@@ -171,9 +176,24 @@ public class TableMapReduceUtil {
public static void initCredentials(JobConf job) throws IOException {
UserProvider provider = UserProvider.instantiate(job);
+
+ if (provider.isHadoopSecurityEnabled()) {
+ // propagate delegation related props from launcher job to MR job
+ if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
+ job.set("mapreduce.job.credentials.binary",
+ System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
+ }
+ }
+
if (provider.isHBaseSecurityEnabled()) {
try {
- provider.getCurrent().obtainAuthTokenForJob(job);
+ User user = provider.getCurrent();
+ Token<?> authToken = getAuthToken(job, user);
+ if (authToken == null) {
+ user.obtainAuthTokenForJob(job);
+ } else {
+ job.getCredentials().addToken(authToken.getService(), authToken);
+ }
} catch (InterruptedException ie) {
ie.printStackTrace();
Thread.interrupted();
@@ -182,6 +202,23 @@ public class TableMapReduceUtil {
}
/**
+ * Get the authentication token of the user for the cluster specified in the configuration
+ * @return null if the user does not have the token, otherwise the auth token for the cluster.
+ */
+ private static Token<?> getAuthToken(Configuration conf, User user)
+ throws IOException, InterruptedException {
+ ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "mr-init-credentials", null);
+ try {
+ String clusterId = ClusterId.readClusterIdZNode(zkw);
+ return user.getToken("HBASE_AUTH_TOKEN", clusterId);
+ } catch (KeeperException e) {
+ throw new IOException(e);
+ } finally {
+ zkw.close();
+ }
+ }
+
+ /**
* Ensures that the given number of reduce tasks for the given job
* configuration does not exceed the number of regions for the given table.
*
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1540241&r1=1540240&r2=1540241&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java Sat Nov 9 01:44:13 2013
@@ -101,6 +101,7 @@ public class LoadIncrementalHFiles exten
public static String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
private boolean useSecure;
+ private boolean hasForwardedToken;
private Token<?> userToken;
private String bulkToken;
private final boolean assignSeqIds;
@@ -232,7 +233,15 @@ public class LoadIncrementalHFiles exten
//Since delegation token doesn't work in mini cluster
if (userProvider.isHadoopSecurityEnabled()) {
FileSystem fs = FileSystem.get(cfg);
- userToken = fs.getDelegationToken("renewer");
+ userToken = userProvider.getCurrent().getToken("HDFS_DELEGATION_TOKEN",
+ fs.getCanonicalServiceName());
+ if (userToken == null) {
+ hasForwardedToken = false;
+ userToken = fs.getDelegationToken("renewer");
+ } else {
+ hasForwardedToken = true;
+ LOG.info("Use the existing token: " + userToken);
+ }
}
bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(table.getTableName());
}
@@ -266,7 +275,7 @@ public class LoadIncrementalHFiles exten
} finally {
if(useSecure) {
- if(userToken != null) {
+ if(userToken != null && !hasForwardedToken) {
try {
userToken.cancel(cfg);
} catch (Exception e) {
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java?rev=1540241&r1=1540240&r2=1540241&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java Sat Nov 9 01:44:13 2013
@@ -53,12 +53,16 @@ import org.apache.hadoop.hbase.mapreduce
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.security.token.Token;
+import org.apache.zookeeper.KeeperException;
/**
* Utility for {@link TableMapper} and {@link TableReducer}
@@ -284,8 +288,17 @@ public class TableMapReduceUtil {
}
}
- public static void initCredentials(Job job) throws IOException {
+public static void initCredentials(Job job) throws IOException {
UserProvider provider = UserProvider.instantiate(job.getConfiguration());
+
+ if (provider.isHadoopSecurityEnabled()) {
+ // propagate delegation related props from launcher job to MR job
+ if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
+ job.getConfiguration().set("mapreduce.job.credentials.binary",
+ System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
+ }
+ }
+
if (provider.isHBaseSecurityEnabled()) {
try {
// init credentials for remote cluster
@@ -299,10 +312,10 @@ public class TableMapReduceUtil {
peerConf.set(HConstants.ZOOKEEPER_QUORUM, parts[0]);
peerConf.set("hbase.zookeeper.client.port", parts[1]);
peerConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts[2]);
- user.obtainAuthTokenForJob(peerConf, job);
+ obtainAuthTokenForJob(job, peerConf, user);
}
-
- user.obtainAuthTokenForJob(job.getConfiguration(), job);
+
+ obtainAuthTokenForJob(job, job.getConfiguration(), user);
} catch (InterruptedException ie) {
LOG.info("Interrupted obtaining user authentication token");
Thread.interrupted();
@@ -310,6 +323,33 @@ public class TableMapReduceUtil {
}
}
+ private static void obtainAuthTokenForJob(Job job, Configuration conf, User user)
+ throws IOException, InterruptedException {
+ Token<?> authToken = getAuthToken(conf, user);
+ if (authToken == null) {
+ user.obtainAuthTokenForJob(conf, job);
+ } else {
+ job.getCredentials().addToken(authToken.getService(), authToken);
+ }
+ }
+
+ /**
+ * Get the authentication token of the user for the cluster specified in the configuration
+ * @return null if the user does not have the token, otherwise the auth token for the cluster.
+ */
+ private static Token<?> getAuthToken(Configuration conf, User user)
+ throws IOException, InterruptedException {
+ ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "mr-init-credentials", null);
+ try {
+ String clusterId = ClusterId.readClusterIdZNode(zkw);
+ return user.getToken("HBASE_AUTH_TOKEN", clusterId);
+ } catch (KeeperException e) {
+ throw new IOException(e);
+ } finally {
+ zkw.close();
+ }
+ }
+
/**
* Writes the given scan into a Base64 encoded string.
*
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/security/User.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/security/User.java?rev=1540241&r1=1540240&r2=1540241&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/security/User.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/security/User.java Sat Nov 9 01:44:13 2013
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.HBaseConf
import org.apache.hadoop.hbase.util.Methods;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.IOException;
@@ -127,6 +128,25 @@ public abstract class User {
public abstract void obtainAuthTokenForJob(JobConf job)
throws IOException, InterruptedException;
+ /**
+ * Returns the Token of the specified kind associated with this user,
+ * or null if the Token is not present.
+ *
+ * @param kind the kind of token
+ * @param service service on which the token is supposed to be used
+ * @return the token of the specified kind.
+ */
+ public Token<?> getToken(String kind, String service) throws IOException {
+ for (Token<?> token: ugi.getTokens()) {
+ if (token.getKind().toString().equals(kind) &&
+ (service != null && token.getService().toString().equals(service)))
+ {
+ return token;
+ }
+ }
+ return null;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {