You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ga...@apache.org on 2014/12/05 22:05:22 UTC

hbase git commit: HBASE-12493 Make User and TokenUtil public

Repository: hbase
Updated Branches:
  refs/heads/master 09cd3d7bf -> 1ec6609b9


HBASE-12493 Make User and TokenUtil public


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1ec6609b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1ec6609b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1ec6609b

Branch: refs/heads/master
Commit: 1ec6609b978033550d4ead35de0ab5344b789bd1
Parents: 09cd3d7
Author: Gary Helmling <ga...@cask.co>
Authored: Fri Dec 5 13:02:16 2014 -0800
Committer: Gary Helmling <ga...@cask.co>
Committed: Fri Dec 5 13:04:54 2014 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/security/token/TokenUtil.java  | 383 +++++++++++++++++++
 .../org/apache/hadoop/hbase/security/User.java  |  35 +-
 .../hadoop/hbase/mapred/TableMapReduceUtil.java |  30 +-
 .../hbase/mapreduce/TableMapReduceUtil.java     |  51 +--
 .../hadoop/hbase/security/token/TokenUtil.java  | 200 ----------
 .../security/token/TestTokenAuthentication.java |  37 +-
 6 files changed, 476 insertions(+), 260 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/1ec6609b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java
new file mode 100644
index 0000000..d21c3e5
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security.token;
+
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.security.PrivilegedExceptionAction;
+
+import com.google.protobuf.ServiceException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Utility methods for obtaining authentication tokens.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class TokenUtil {
+  // This class is referenced indirectly by User out in common; instances are created by reflection
+  private static Log LOG = LogFactory.getLog(TokenUtil.class);
+
+  /**
+   * Obtain and return an authentication token for the current user.
+   * @param conf the configuration for connecting to the cluster
+   * @return the authentication token instance
+   * @deprecated Replaced by {@link #obtainToken(Connection)}
+   */
+  @Deprecated
+  public static Token<AuthenticationTokenIdentifier> obtainToken(
+      Configuration conf) throws IOException {
+    try (Connection connection = ConnectionFactory.createConnection(conf)) {
+      return obtainToken(connection);
+    }
+  }
+
+  /**
+   * Obtain and return an authentication token for the current user.
+   * @param conn The HBase cluster connection
+   * @return the authentication token instance
+   */
+  public static Token<AuthenticationTokenIdentifier> obtainToken(
+      Connection conn) throws IOException {
+    Table meta = null;
+    try {
+      meta = conn.getTable(TableName.META_TABLE_NAME);
+      CoprocessorRpcChannel rpcChannel = meta.coprocessorService(HConstants.EMPTY_START_ROW);
+      AuthenticationProtos.AuthenticationService.BlockingInterface service =
+          AuthenticationProtos.AuthenticationService.newBlockingStub(rpcChannel);
+      AuthenticationProtos.GetAuthenticationTokenResponse response = service.getAuthenticationToken(null,
+          AuthenticationProtos.GetAuthenticationTokenRequest.getDefaultInstance());
+
+      return ProtobufUtil.toToken(response.getToken());
+    } catch (ServiceException se) {
+      ProtobufUtil.toIOException(se);
+    } finally {
+      if (meta != null) {
+        meta.close();
+      }
+    }
+    // dummy return for ServiceException block
+    return null;
+  }
+
+  /**
+   * Obtain and return an authentication token for the current user.
+   * @param conn The HBase cluster connection
+   * @return the authentication token instance
+   */
+  public static Token<AuthenticationTokenIdentifier> obtainToken(
+      final Connection conn, User user) throws IOException, InterruptedException {
+    return user.runAs(new PrivilegedExceptionAction<Token<AuthenticationTokenIdentifier>>() {
+      @Override
+      public Token<AuthenticationTokenIdentifier> run() throws Exception {
+        return obtainToken(conn);
+      }
+    });
+  }
+
+
+  private static Text getClusterId(Token<AuthenticationTokenIdentifier> token)
+      throws IOException {
+    return token.getService() != null
+        ? token.getService() : new Text("default");
+  }
+
+  /**
+   * Obtain an authentication token for the given user and add it to the
+   * user's credentials.
+   * @param conf The configuration for connecting to the cluster
+   * @param user The user for whom to obtain the token
+   * @throws IOException If making a remote call to the
+   *     {@link AuthenticationProtos.AuthenticationService} fails
+   * @throws InterruptedException If executing as the given user is interrupted
+   * @deprecated Replaced by {@link #obtainAndCacheToken(Connection,User)}
+   */
+  @Deprecated
+  public static void obtainAndCacheToken(final Configuration conf,
+                                         UserGroupInformation user)
+      throws IOException, InterruptedException {
+    Connection conn = ConnectionFactory.createConnection(conf);
+    try {
+      UserProvider userProvider = UserProvider.instantiate(conf);
+      obtainAndCacheToken(conn, userProvider.create(user));
+    } finally {
+      conn.close();
+    }
+  }
+
+  /**
+   * Obtain an authentication token for the given user and add it to the
+   * user's credentials.
+   * @param conn The HBase cluster connection
+   * @param user The user for whom to obtain the token
+   * @throws IOException If making a remote call to the
+   *     {@link AuthenticationProtos.AuthenticationService} fails
+   * @throws InterruptedException If executing as the given user is interrupted
+   */
+  public static void obtainAndCacheToken(final Connection conn,
+      User user)
+      throws IOException, InterruptedException {
+    try {
+      Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user);
+
+      if (token == null) {
+        throw new IOException("No token returned for user " + user.getName());
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Obtained token " + token.getKind().toString() + " for user " +
+            user.getName());
+      }
+      user.addToken(token);
+    } catch (IOException ioe) {
+      throw ioe;
+    } catch (InterruptedException ie) {
+      throw ie;
+    } catch (RuntimeException re) {
+      throw re;
+    } catch (Exception e) {
+      throw new UndeclaredThrowableException(e,
+          "Unexpected exception obtaining token for user " + user.getName());
+    }
+  }
+
+  /**
+   * Obtain an authentication token on behalf of the given user and add it to
+   * the credentials for the given map reduce job.
+   * @param conf The configuration for connecting to the cluster
+   * @param user The user for whom to obtain the token
+   * @param job The job instance in which the token should be stored
+   * @throws IOException If making a remote call to the
+   *     {@link AuthenticationProtos.AuthenticationService} fails
+   * @throws InterruptedException If executing as the given user is interrupted
+   * @deprecated Replaced by {@link #obtainTokenForJob(Connection,User,Job)}
+   */
+  @Deprecated
+  public static void obtainTokenForJob(final Configuration conf,
+                                       UserGroupInformation user, Job job)
+      throws IOException, InterruptedException {
+    Connection conn = ConnectionFactory.createConnection(conf);
+    try {
+      UserProvider userProvider = UserProvider.instantiate(conf);
+      obtainTokenForJob(conn, userProvider.create(user), job);
+    } finally {
+      conn.close();
+    }
+  }
+
+  /**
+   * Obtain an authentication token on behalf of the given user and add it to
+   * the credentials for the given map reduce job.
+   * @param conn The HBase cluster connection
+   * @param user The user for whom to obtain the token
+   * @param job The job instance in which the token should be stored
+   * @throws IOException If making a remote call to the
+   *     {@link AuthenticationProtos.AuthenticationService} fails
+   * @throws InterruptedException If executing as the given user is interrupted
+   */
+  public static void obtainTokenForJob(final Connection conn,
+      User user, Job job)
+      throws IOException, InterruptedException {
+    try {
+      Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user);
+
+      if (token == null) {
+        throw new IOException("No token returned for user " + user.getName());
+      }
+      Text clusterId = getClusterId(token);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Obtained token " + token.getKind().toString() + " for user " +
+            user.getName() + " on cluster " + clusterId.toString());
+      }
+      job.getCredentials().addToken(clusterId, token);
+    } catch (IOException ioe) {
+      throw ioe;
+    } catch (InterruptedException ie) {
+      throw ie;
+    } catch (RuntimeException re) {
+      throw re;
+    } catch (Exception e) {
+      throw new UndeclaredThrowableException(e,
+          "Unexpected exception obtaining token for user " + user.getName());
+    }
+  }
+
+  /**
+   * Obtain an authentication token on behalf of the given user and add it to
+   * the credentials for the given map reduce job.
+   * @param user The user for whom to obtain the token
+   * @param job The job configuration in which the token should be stored
+   * @throws IOException If making a remote call to the
+   *     {@link AuthenticationProtos.AuthenticationService} fails
+   * @throws InterruptedException If executing as the given user is interrupted
+   * @deprecated Replaced by {@link #obtainTokenForJob(Connection,JobConf,User)}
+   */
+  @Deprecated
+  public static void obtainTokenForJob(final JobConf job,
+                                       UserGroupInformation user)
+      throws IOException, InterruptedException {
+    Connection conn = ConnectionFactory.createConnection(job);
+    try {
+      UserProvider userProvider = UserProvider.instantiate(job);
+      obtainTokenForJob(conn, job, userProvider.create(user));
+    } finally {
+      conn.close();
+    }
+  }
+
+  /**
+   * Obtain an authentication token on behalf of the given user and add it to
+   * the credentials for the given map reduce job.
+   * @param conn The HBase cluster connection
+   * @param user The user for whom to obtain the token
+   * @param job The job configuration in which the token should be stored
+   * @throws IOException If making a remote call to the
+   *     {@link AuthenticationProtos.AuthenticationService} fails
+   * @throws InterruptedException If executing as the given user is interrupted
+   */
+  public static void obtainTokenForJob(final Connection conn, final JobConf job, User user)
+      throws IOException, InterruptedException {
+    try {
+      Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user);
+
+      if (token == null) {
+        throw new IOException("No token returned for user " + user.getName());
+      }
+      Text clusterId = getClusterId(token);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Obtained token " + token.getKind().toString() + " for user " +
+            user.getName() + " on cluster " + clusterId.toString());
+      }
+      job.getCredentials().addToken(clusterId, token);
+    } catch (IOException ioe) {
+      throw ioe;
+    } catch (InterruptedException ie) {
+      throw ie;
+    } catch (RuntimeException re) {
+      throw re;
+    } catch (Exception e) {
+      throw new UndeclaredThrowableException(e,
+          "Unexpected exception obtaining token for user "+user.getName());
+    }
+  }
+
+  /**
+   * Checks for an authentication token for the given user, obtaining a new token if necessary,
+   * and adds it to the credentials for the given map reduce job.
+   *
+   * @param conn The HBase cluster connection
+   * @param user The user for whom to obtain the token
+   * @param job The job configuration in which the token should be stored
+   * @throws IOException If making a remote call to the
+   *     {@link AuthenticationProtos.AuthenticationService} fails
+   * @throws InterruptedException If executing as the given user is interrupted
+   */
+  public static void addTokenForJob(final Connection conn, final JobConf job, User user)
+      throws IOException, InterruptedException {
+
+    Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
+    if (token == null) {
+      token = obtainToken(conn, user);
+    }
+    job.getCredentials().addToken(token.getService(), token);
+  }
+
+  /**
+   * Checks for an authentication token for the given user, obtaining a new token if necessary,
+   * and adds it to the credentials for the given map reduce job.
+   *
+   * @param conn The HBase cluster connection
+   * @param user The user for whom to obtain the token
+   * @param job The job instance in which the token should be stored
+   * @throws IOException If making a remote call to the
+   *     {@link AuthenticationProtos.AuthenticationService} fails
+   * @throws InterruptedException If executing as the given user is interrupted
+   */
+  public static void addTokenForJob(final Connection conn, User user, Job job)
+      throws IOException, InterruptedException {
+    Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
+    if (token == null) {
+      token = obtainToken(conn, user);
+    }
+    job.getCredentials().addToken(token.getService(), token);
+  }
+
+  /**
+   * Checks if an authentication tokens exists for the connected cluster,
+   * obtaining one if needed and adding it to the user's credentials.
+   *
+   * @param conn The HBase cluster connection
+   * @param user The user for whom to obtain the token
+   * @throws IOException If making a remote call to the
+   *     {@link AuthenticationProtos.AuthenticationService} fails
+   * @throws InterruptedException If executing as the given user is interrupted
+   * @return true if the token was added, false if it already existed
+   */
+  public static boolean addTokenIfMissing(Connection conn, User user)
+      throws IOException, InterruptedException {
+    Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
+    if (token == null) {
+      token = obtainToken(conn, user);
+      user.getUGI().addToken(token.getService(), token);
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Get the authentication token of the user for the cluster specified in the configuration
+   * @return null if the user does not have the token, otherwise the auth token for the cluster.
+   */
+  private static Token<AuthenticationTokenIdentifier> getAuthToken(Configuration conf, User user)
+      throws IOException, InterruptedException {
+    ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "TokenUtil-getAuthToken", null);
+    try {
+      String clusterId = ZKClusterId.readClusterIdZNode(zkw);
+      if (clusterId == null) {
+        throw new IOException("Failed to get cluster ID");
+      }
+      return new AuthenticationTokenSelector().selectToken(new Text(clusterId), user.getTokens());
+    } catch (KeeperException e) {
+      throw new IOException(e);
+    } finally {
+      zkw.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/1ec6609b/hbase-common/src/main/java/org/apache/hadoop/hbase/security/User.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/security/User.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/security/User.java
index 5f316f2..a5ac51a 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/security/User.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/security/User.java
@@ -23,15 +23,18 @@ import java.io.IOException;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.security.PrivilegedAction;
 import java.security.PrivilegedExceptionAction;
+import java.util.Collection;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.util.Methods;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 
 /**
  * Wrapper to abstract out usage of user and group information in HBase.
@@ -44,7 +47,8 @@ import org.apache.hadoop.security.token.Token;
  * HBase, but can be extended as needs change.
  * </p>
  */
-@InterfaceAudience.Private
+@InterfaceAudience.Public
+@InterfaceStability.Stable
 public abstract class User {
   public static final String HBASE_SECURITY_CONF_KEY =
       "hbase.security.authentication";
@@ -58,6 +62,7 @@ public abstract class User {
   /**
    * Returns the full user name.  For Kerberos principals this will include
    * the host and realm portions of the principal name.
+   *
    * @return User full name.
    */
   public String getName() {
@@ -76,6 +81,7 @@ public abstract class User {
   /**
    * Returns the shortened version of the user name -- the portion that maps
    * to an operating system user name.
+   *
    * @return Short name
    */
   public abstract String getShortName();
@@ -96,7 +102,10 @@ public abstract class User {
    * user's credentials.
    *
    * @throws IOException
+   * @deprecated Use {@code TokenUtil.obtainAuthTokenForJob(Connection,User,Job)}
+   *     instead.
    */
+  @Deprecated
   public abstract void obtainAuthTokenForJob(Configuration conf, Job job)
       throws IOException, InterruptedException;
 
@@ -105,7 +114,10 @@ public abstract class User {
    * user's credentials.
    *
    * @throws IOException
+   * @deprecated Use {@code TokenUtil.obtainAuthTokenForJob(Connection,JobConf,User)}
+   *     instead.
    */
+  @Deprecated
   public abstract void obtainAuthTokenForJob(JobConf job)
       throws IOException, InterruptedException;
 
@@ -118,16 +130,31 @@ public abstract class User {
    * @return the token of the specified kind.
    */
   public Token<?> getToken(String kind, String service) throws IOException {
-    for (Token<?> token: ugi.getTokens()) {
+    for (Token<?> token : ugi.getTokens()) {
       if (token.getKind().toString().equals(kind) &&
-          (service != null && token.getService().toString().equals(service)))
-      {
+          (service != null && token.getService().toString().equals(service))) {
         return token;
       }
     }
     return null;
   }
 
+  /**
+   * Returns all the tokens stored in the user's credentials.
+   */
+  public Collection<Token<? extends TokenIdentifier>> getTokens() {
+    return ugi.getTokens();
+  }
+
+  /**
+   * Adds the given Token to the user's credentials.
+   *
+   * @param token the token to add
+   */
+  public void addToken(Token<? extends TokenIdentifier> token) {
+    ugi.addToken(token);
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/1ec6609b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
index 1afb9d6..b5fefbb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
@@ -35,6 +37,7 @@ import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
 import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.token.TokenUtil;
 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.io.Text;
@@ -236,40 +239,21 @@ public class TableMapReduceUtil {
     }
 
     if (userProvider.isHBaseSecurityEnabled()) {
+      Connection conn = ConnectionFactory.createConnection(job);
       try {
         // login the server principal (if using secure Hadoop)
         User user = userProvider.getCurrent();
-        Token<AuthenticationTokenIdentifier> authToken = getAuthToken(job, user);
-        if (authToken == null) {
-          user.obtainAuthTokenForJob(job);
-        } else {
-          job.getCredentials().addToken(authToken.getService(), authToken);
-        }
+        TokenUtil.addTokenForJob(conn, job, user);
       } catch (InterruptedException ie) {
         ie.printStackTrace();
         Thread.currentThread().interrupt();
+      } finally {
+        conn.close();
       }
     }
   }
 
   /**
-   * Get the authentication token of the user for the cluster specified in the configuration
-   * @return null if the user does not have the token, otherwise the auth token for the cluster.
-   */
-  private static Token<AuthenticationTokenIdentifier> getAuthToken(Configuration conf, User user)
-      throws IOException, InterruptedException {
-    ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "mr-init-credentials", null);
-    try {
-      String clusterId = ZKClusterId.readClusterIdZNode(zkw);
-      return new AuthenticationTokenSelector().selectToken(new Text(clusterId), user.getUGI().getTokens());
-    } catch (KeeperException e) {
-      throw new IOException(e);
-    } finally {
-      zkw.close();
-    }
-  }
-
-  /**
    * Ensures that the given number of reduce tasks for the given job
    * configuration does not exceed the number of regions for the given table.
    *

http://git-wip-us.apache.org/repos/asf/hbase/blob/1ec6609b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
index b514941..ab99e1a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
@@ -43,6 +43,8 @@ import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -52,6 +54,7 @@ import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
 import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
+import org.apache.hadoop.hbase.security.token.TokenUtil;
 import org.apache.hadoop.hbase.util.Base64;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
@@ -452,10 +455,20 @@ public class TableMapReduceUtil {
         if (quorumAddress != null) {
           Configuration peerConf = HBaseConfiguration.create(job.getConfiguration());
           ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress);
-          obtainAuthTokenForJob(job, peerConf, user);
+          Connection peerConn = ConnectionFactory.createConnection(peerConf);
+          try {
+            TokenUtil.addTokenForJob(peerConn, user, job);
+          } finally {
+            peerConn.close();
+          }
         }
 
-        obtainAuthTokenForJob(job, job.getConfiguration(), user);
+        Connection conn = ConnectionFactory.createConnection(job.getConfiguration());
+        try {
+          TokenUtil.addTokenForJob(conn, user, job);
+        } finally {
+          conn.close();
+        }
       } catch (InterruptedException ie) {
         LOG.info("Interrupted obtaining user authentication token");
         Thread.currentThread().interrupt();
@@ -481,7 +494,12 @@ public class TableMapReduceUtil {
       try {
         Configuration peerConf = HBaseConfiguration.create(job.getConfiguration());
         ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress);
-        obtainAuthTokenForJob(job, peerConf, userProvider.getCurrent());
+        Connection peerConn = ConnectionFactory.createConnection(peerConf);
+        try {
+          TokenUtil.addTokenForJob(peerConn, userProvider.getCurrent(), job);
+        } finally {
+          peerConn.close();
+        }
       } catch (InterruptedException e) {
         LOG.info("Interrupted obtaining user authentication token");
         Thread.interrupted();
@@ -489,33 +507,6 @@ public class TableMapReduceUtil {
     }
   }
 
-  private static void obtainAuthTokenForJob(Job job, Configuration conf, User user)
-      throws IOException, InterruptedException {
-    Token<AuthenticationTokenIdentifier> authToken = getAuthToken(conf, user);
-    if (authToken == null) {
-      user.obtainAuthTokenForJob(conf, job);
-    } else {
-      job.getCredentials().addToken(authToken.getService(), authToken);
-    }
-  }
-
-  /**
-   * Get the authentication token of the user for the cluster specified in the configuration
-   * @return null if the user does not have the token, otherwise the auth token for the cluster.
-   */
-  private static Token<AuthenticationTokenIdentifier> getAuthToken(Configuration conf, User user)
-      throws IOException, InterruptedException {
-    ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "mr-init-credentials", null);
-    try {
-      String clusterId = ZKClusterId.readClusterIdZNode(zkw);
-      return new AuthenticationTokenSelector().selectToken(new Text(clusterId), user.getUGI().getTokens());
-    } catch (KeeperException e) {
-      throw new IOException(e);
-    } finally {
-      zkw.close();
-    }
-  }
-
   /**
    * Writes the given scan into a Base64 encoded string.
    *

http://git-wip-us.apache.org/repos/asf/hbase/blob/1ec6609b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java
deleted file mode 100644
index e3c4f53..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.security.token;
-
-import java.io.IOException;
-import java.lang.reflect.UndeclaredThrowableException;
-import java.security.PrivilegedExceptionAction;
-
-import com.google.protobuf.ServiceException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-
-/**
- * Utility methods for obtaining authentication tokens.
- */
-@InterfaceAudience.Private
-public class TokenUtil {
-  // This class is referenced indirectly by User out in common; instances are created by reflection
-  private static Log LOG = LogFactory.getLog(TokenUtil.class);
-
-  /**
-   * Obtain and return an authentication token for the current user.
-   * @param conf The configuration for connecting to the cluster
-   * @return the authentication token instance
-   */
-  public static Token<AuthenticationTokenIdentifier> obtainToken(
-      Configuration conf) throws IOException {
-    // TODO: Pass in a Connection to used. Will this even work?
-    try (Connection connection = ConnectionFactory.createConnection(conf)) {
-      try (Table meta = connection.getTable(TableName.META_TABLE_NAME)) {
-        CoprocessorRpcChannel rpcChannel = meta.coprocessorService(HConstants.EMPTY_START_ROW);
-        AuthenticationProtos.AuthenticationService.BlockingInterface service =
-            AuthenticationProtos.AuthenticationService.newBlockingStub(rpcChannel);
-        AuthenticationProtos.GetAuthenticationTokenResponse response =
-          service.getAuthenticationToken(null,
-            AuthenticationProtos.GetAuthenticationTokenRequest.getDefaultInstance());
-
-        return ProtobufUtil.toToken(response.getToken());
-      } catch (ServiceException se) {
-        ProtobufUtil.toIOException(se);
-      }
-    }
-    // dummy return for ServiceException catch block
-    return null;
-  }
-
-  private static Text getClusterId(Token<AuthenticationTokenIdentifier> token)
-      throws IOException {
-    return token.getService() != null
-        ? token.getService() : new Text("default");
-  }
-
-  /**
-   * Obtain an authentication token for the given user and add it to the
-   * user's credentials.
-   * @param conf The configuration for connecting to the cluster
-   * @param user The user for whom to obtain the token
-   * @throws IOException If making a remote call to the {@link TokenProvider} fails
-   * @throws InterruptedException If executing as the given user is interrupted
-   */
-  public static void obtainAndCacheToken(final Configuration conf,
-      UserGroupInformation user)
-      throws IOException, InterruptedException {
-    try {
-      Token<AuthenticationTokenIdentifier> token =
-          user.doAs(new PrivilegedExceptionAction<Token<AuthenticationTokenIdentifier>>() {
-            public Token<AuthenticationTokenIdentifier> run() throws Exception {
-              return obtainToken(conf);
-            }
-          });
-
-      if (token == null) {
-        throw new IOException("No token returned for user "+user.getUserName());
-      }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Obtained token "+token.getKind().toString()+" for user "+
-            user.getUserName());
-      }
-      user.addToken(token);
-    } catch (IOException ioe) {
-      throw ioe;
-    } catch (InterruptedException ie) {
-      throw ie;
-    } catch (RuntimeException re) {
-      throw re;
-    } catch (Exception e) {
-      throw new UndeclaredThrowableException(e,
-          "Unexpected exception obtaining token for user "+user.getUserName());
-    }
-  }
-
-  /**
-   * Obtain an authentication token on behalf of the given user and add it to
-   * the credentials for the given map reduce job.
-   * @param conf The configuration for connecting to the cluster
-   * @param user The user for whom to obtain the token
-   * @param job The job instance in which the token should be stored
-   * @throws IOException If making a remote call to the {@link TokenProvider} fails
-   * @throws InterruptedException If executing as the given user is interrupted
-   */
-  public static void obtainTokenForJob(final Configuration conf,
-      UserGroupInformation user, Job job)
-      throws IOException, InterruptedException {
-    try {
-      Token<AuthenticationTokenIdentifier> token =
-          user.doAs(new PrivilegedExceptionAction<Token<AuthenticationTokenIdentifier>>() {
-            public Token<AuthenticationTokenIdentifier> run() throws Exception {
-              return obtainToken(conf);
-            }
-          });
-
-      if (token == null) {
-        throw new IOException("No token returned for user "+user.getUserName());
-      }
-      Text clusterId = getClusterId(token);
-      LOG.info("Obtained token "+token.getKind().toString()+" for user "+
-          user.getUserName() + " on cluster "+clusterId.toString());
-      job.getCredentials().addToken(clusterId, token);
-    } catch (IOException ioe) {
-      throw ioe;
-    } catch (InterruptedException ie) {
-      throw ie;
-    } catch (RuntimeException re) {
-      throw re;
-    } catch (Exception e) {
-      throw new UndeclaredThrowableException(e,
-          "Unexpected exception obtaining token for user "+user.getUserName());
-    }
-  }
-
-  /**
-   * Obtain an authentication token on behalf of the given user and add it to
-   * the credentials for the given map reduce job.
-   * @param user The user for whom to obtain the token
-   * @param job The job configuration in which the token should be stored
-   * @throws IOException If making a remote call to the {@link TokenProvider} fails
-   * @throws InterruptedException If executing as the given user is interrupted
-   */
-  public static void obtainTokenForJob(final JobConf job,
-      UserGroupInformation user)
-      throws IOException, InterruptedException {
-    try {
-      Token<AuthenticationTokenIdentifier> token =
-          user.doAs(new PrivilegedExceptionAction<Token<AuthenticationTokenIdentifier>>() {
-            public Token<AuthenticationTokenIdentifier> run() throws Exception {
-              return obtainToken(job);
-            }
-          });
-
-      if (token == null) {
-        throw new IOException("No token returned for user "+user.getUserName());
-      }
-      Text clusterId = getClusterId(token);
-      LOG.info("Obtained token "+token.getKind().toString()+" for user "+
-          user.getUserName()+" on cluster "+clusterId.toString());
-      job.getCredentials().addToken(clusterId, token);
-    } catch (IOException ioe) {
-      throw ioe;
-    } catch (InterruptedException ie) {
-      throw ie;
-    } catch (RuntimeException re) {
-      throw re;
-    } catch (Exception e) {
-      throw new UndeclaredThrowableException(e,
-          "Unexpected exception obtaining token for user "+user.getUserName());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/1ec6609b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
index 6a62071..abdec58 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.security.token;
 
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
@@ -41,6 +43,8 @@ import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
@@ -402,11 +406,11 @@ public class TestTokenAuthentication {
                 System.currentTimeMillis());
         try {
           BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn,
-            User.getCurrent(), HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+              User.getCurrent(), HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
           AuthenticationProtos.AuthenticationService.BlockingInterface stub =
-            AuthenticationProtos.AuthenticationService.newBlockingStub(channel);
+              AuthenticationProtos.AuthenticationService.newBlockingStub(channel);
           AuthenticationProtos.WhoAmIResponse response =
-            stub.whoAmI(null, AuthenticationProtos.WhoAmIRequest.getDefaultInstance());
+              stub.whoAmI(null, AuthenticationProtos.WhoAmIRequest.getDefaultInstance());
           String myname = response.getUsername();
           assertEquals("testuser", myname);
           String authMethod = response.getAuthMethod();
@@ -418,4 +422,31 @@ public class TestTokenAuthentication {
       }
     });
   }
+
+  @Test
+  public void testUseExistingToken() throws Exception {
+    User user = User.createUserForTesting(TEST_UTIL.getConfiguration(), "testuser2",
+        new String[]{"testgroup"});
+    Token<AuthenticationTokenIdentifier> token =
+        secretManager.generateToken(user.getName());
+    assertNotNull(token);
+    user.addToken(token);
+
+    // make sure we got a token
+    Token<AuthenticationTokenIdentifier> firstToken =
+        new AuthenticationTokenSelector().selectToken(token.getService(), user.getTokens());
+    assertNotNull(firstToken);
+    assertEquals(token, firstToken);
+
+    Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
+    try {
+      assertFalse(TokenUtil.addTokenIfMissing(conn, user));
+      // make sure we still have the same token
+      Token<AuthenticationTokenIdentifier> secondToken =
+          new AuthenticationTokenSelector().selectToken(token.getService(), user.getTokens());
+      assertEquals(firstToken, secondToken);
+    } finally {
+      conn.close();
+    }
+  }
 }