You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2013/11/09 02:44:16 UTC

svn commit: r1540242 - in /hbase/trunk: hbase-common/src/main/java/org/apache/hadoop/hbase/security/ hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/

Author: mbertozzi
Date: Sat Nov  9 01:44:16 2013
New Revision: 1540242

URL: http://svn.apache.org/r1540242
Log:
HBASE-9890 MR jobs are not working if started by a delegated user

Modified:
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/security/User.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java

Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/security/User.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/security/User.java?rev=1540242&r1=1540241&r2=1540242&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/security/User.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/security/User.java Sat Nov  9 01:44:16 2013
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.util.Meth
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 
 /**
  * Wrapper to abstract out usage of user and group information in HBase.
@@ -111,6 +112,25 @@ public abstract class User {
   public abstract void obtainAuthTokenForJob(JobConf job)
       throws IOException, InterruptedException;
 
+  /**
+   * Returns the Token of the specified kind associated with this user,
+   * or null if the Token is not present.
+   *
+   * @param kind the kind of token
+   * @param service service on which the token is supposed to be used
+   * @return the token of the specified kind.
+   */
+  public Token<?> getToken(String kind, String service) throws IOException {
+    for (Token<?> token: ugi.getTokens()) {
+      if (token.getKind().toString().equals(kind) &&
+          (service != null && token.getService().toString().equals(service)))
+      {
+        return token;
+      }
+    }
+    return null;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java?rev=1540242&r1=1540241&r2=1540242&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java Sat Nov  9 01:44:16 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.mapred;
 
 import java.io.IOException;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -28,8 +29,13 @@ import org.apache.hadoop.hbase.client.Pu
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
 import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
+import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
+import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.InputFormat;
@@ -37,6 +43,8 @@ import org.apache.hadoop.mapred.OutputFo
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.mapred.TextOutputFormat;
 import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.security.token.Token;
+import org.apache.zookeeper.KeeperException;
 
 /**
  * Utility for {@link TableMap} and {@link TableReduce}
@@ -178,10 +186,23 @@ public class TableMapReduceUtil {
 
   public static void initCredentials(JobConf job) throws IOException {
     UserProvider userProvider = UserProvider.instantiate(job);
-    // login the server principal (if using secure Hadoop)
+    if (userProvider.isHadoopSecurityEnabled()) {
+      // propagate delegation related props from launcher job to MR job
+      if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
+        job.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
+      }
+    }
+
     if (userProvider.isHBaseSecurityEnabled()) {
       try {
-        userProvider.getCurrent().obtainAuthTokenForJob(job);
+        // login the server principal (if using secure Hadoop)
+        User user = userProvider.getCurrent();
+        Token<AuthenticationTokenIdentifier> authToken = getAuthToken(job, user);
+        if (authToken == null) {
+          user.obtainAuthTokenForJob(job);
+        } else {
+          job.getCredentials().addToken(authToken.getService(), authToken);
+        }
       } catch (InterruptedException ie) {
         ie.printStackTrace();
         Thread.interrupted();
@@ -190,6 +211,23 @@ public class TableMapReduceUtil {
   }
 
   /**
+   * Get the authentication token of the user for the cluster specified in the configuration
+   * @return null if the user does not have the token, otherwise the auth token for the cluster.
+   */
+  private static Token<AuthenticationTokenIdentifier> getAuthToken(Configuration conf, User user)
+      throws IOException, InterruptedException {
+    ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "mr-init-credentials", null);
+    try {
+      String clusterId = ZKClusterId.readClusterIdZNode(zkw);
+      return new AuthenticationTokenSelector().selectToken(new Text(clusterId), user.getUGI().getTokens());
+    } catch (KeeperException e) {
+      throw new IOException(e);
+    } finally {
+      zkw.close();
+    }
+  }
+
+  /**
    * Ensures that the given number of reduce tasks for the given job
    * configuration does not exceed the number of regions for the given table.
    *

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1540242&r1=1540241&r2=1540242&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java Sat Nov  9 01:44:16 2013
@@ -106,6 +106,7 @@ public class LoadIncrementalHFiles exten
   private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
   private boolean assignSeqIds;
 
+  private boolean hasForwardedToken;
   private Token<?> userToken;
   private String bulkToken;
   private UserProvider userProvider;
@@ -254,7 +255,15 @@ public class LoadIncrementalHFiles exten
         //This condition is here for unit testing
         //Since delegation token doesn't work in mini cluster
         if (userProvider.isHadoopSecurityEnabled()) {
-         userToken = fs.getDelegationToken("renewer");
+          userToken = userProvider.getCurrent().getToken("HDFS_DELEGATION_TOKEN",
+                                                         fs.getCanonicalServiceName());
+          if (userToken == null) {
+            hasForwardedToken = false;
+            userToken = fs.getDelegationToken("renewer");
+          } else {
+            hasForwardedToken = true;
+            LOG.info("Use the existing token: " + userToken);
+          }
         }
         bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(table.getName());
       }
@@ -288,7 +297,7 @@ public class LoadIncrementalHFiles exten
 
     } finally {
       if (userProvider.isHBaseSecurityEnabled()) {
-        if(userToken != null) {
+        if (userToken != null && !hasForwardedToken) {
           try {
             userToken.cancel(cfg);
           } catch (Exception e) {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java?rev=1540242&r1=1540241&r2=1540242&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java Sat Nov  9 01:44:16 2013
@@ -50,14 +50,21 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
+import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
 import org.apache.hadoop.hbase.util.Base64;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.zookeeper.KeeperException;
 
 import com.google.protobuf.InvalidProtocolBufferException;
 
@@ -295,16 +302,26 @@ public class TableMapReduceUtil {
 
   public static void initCredentials(Job job) throws IOException {
     UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
+    if (userProvider.isHadoopSecurityEnabled()) {
+      // propagate delegation related props from launcher job to MR job
+      if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
+        job.getConfiguration().set("mapreduce.job.credentials.binary",
+                                   System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
+      }
+    }
+
     if (userProvider.isHBaseSecurityEnabled()) {
       try {
         // init credentials for remote cluster
         String quorumAddress = job.getConfiguration().get(TableOutputFormat.QUORUM_ADDRESS);
+        User user = userProvider.getCurrent();
         if (quorumAddress != null) {
           Configuration peerConf = HBaseConfiguration.create(job.getConfiguration());
           ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress);
-          userProvider.getCurrent().obtainAuthTokenForJob(peerConf, job);
+          obtainAuthTokenForJob(job, peerConf, user);
         }
-        userProvider.getCurrent().obtainAuthTokenForJob(job.getConfiguration(), job);
+
+        obtainAuthTokenForJob(job, job.getConfiguration(), user);
       } catch (InterruptedException ie) {
         LOG.info("Interrupted obtaining user authentication token");
         Thread.interrupted();
@@ -312,6 +329,33 @@ public class TableMapReduceUtil {
     }
   }
 
+  private static void obtainAuthTokenForJob(Job job, Configuration conf, User user)
+      throws IOException, InterruptedException {
+    Token<AuthenticationTokenIdentifier> authToken = getAuthToken(conf, user);
+    if (authToken == null) {
+      user.obtainAuthTokenForJob(conf, job);
+    } else {
+      job.getCredentials().addToken(authToken.getService(), authToken);
+    }
+  }
+
+  /**
+   * Get the authentication token of the user for the cluster specified in the configuration
+   * @return null if the user does not have the token, otherwise the auth token for the cluster.
+   */
+  private static Token<AuthenticationTokenIdentifier> getAuthToken(Configuration conf, User user)
+      throws IOException, InterruptedException {
+    ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "mr-init-credentials", null);
+    try {
+      String clusterId = ZKClusterId.readClusterIdZNode(zkw);
+      return new AuthenticationTokenSelector().selectToken(new Text(clusterId), user.getUGI().getTokens());
+    } catch (KeeperException e) {
+      throw new IOException(e);
+    } finally {
+      zkw.close();
+    }
+  }
+
   /**
    * Writes the given scan into a Base64 encoded string.
    *